You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/11/11 09:42:56 UTC
[ozone] branch master updated: HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a13c62b605 HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)
a13c62b605 is described below
commit a13c62b60556cd003ee2149179f72029d9e35756
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Nov 11 09:42:50 2022 +0000
HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)
---
.../DatanodeCommandCountUpdatedHandler.java | 48 +++++++++++++++++++++
.../container/replication/ReplicationManager.java | 15 +++++++
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 9 ++++
.../hadoop/hdds/scm/node/SCMNodeManager.java | 2 +
.../hdds/scm/server/StorageContainerManager.java | 5 +++
.../TestDatanodeCommandCountUpdatedHandler.java | 49 ++++++++++++++++++++++
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 13 ++++++
7 files changed, 141 insertions(+)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java
new file mode 100644
index 0000000000..87e5928f40
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event handler for the DATANODE_COMMAND_COUNT_UPDATED event.
+ */
+public class DatanodeCommandCountUpdatedHandler implements
+ EventHandler<DatanodeDetails> {
+
+ private ReplicationManager replicationManager;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DatanodeCommandCountUpdatedHandler.class);
+
+ public DatanodeCommandCountUpdatedHandler(
+ ReplicationManager replicationManager) {
+ this.replicationManager = replicationManager;
+ }
+
+ @Override
+ public void onMessage(DatanodeDetails datanodeDetails,
+ EventPublisher publisher) {
+ LOG.debug("DatanodeCommandCountUpdatedHandler called with datanode {}",
+ datanodeDetails);
+ replicationManager.datanodeCommandCountUpdated(datanodeDetails);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 6aceab13d1..686ed74f50 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -488,6 +488,21 @@ public class ReplicationManager implements SCMService {
return scmContext.getTermOfLeader();
}
+ /**
+ * Notify ReplicationManager that the command counts on a datanode have been
+ * updated via a heartbeat received. This will allow RM to consider the node
+ * for container operations if it was previously excluded due to load.
+ * @param datanodeDetails The datanode for which the commands have been
+ * updated.
+ */
+ public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
+ // For now this is a NOOP, as the plan is to use this notification in a
+ // future change to limit the number of commands scheduled against a DN by
+ // RM.
+ LOG.debug("Received a notification that the DN command count " +
+ "has been updated for {}", datanodeDetails);
+ }
+
protected void processContainer(ContainerInfo containerInfo,
ReplicationQueue repQueue, ReplicationManagerReport report)
throws ContainerNotFoundException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index c51d792d4c..5575760a15 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -59,6 +59,15 @@ public final class SCMEvents {
COMMAND_QUEUE_REPORT = new TypedEvent<>(
CommandQueueReportFromDatanode.class, "Command_Queue_Report");
+ /**
+ * After node manager processes a COMMAND_QUEUE_REPORT it fires
+ * this event to allow any other processes which depend upon the counts to
+ * be notified they have been updated.
+ */
+ public static final TypedEvent<DatanodeDetails>
+ DATANODE_COMMAND_COUNT_UPDATED = new TypedEvent<>(
+ DatanodeDetails.class, "Datanode_Command_Queue_Updated");
+
/**
* Event generated on DataNode registration.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c5ec348f2a..b9bc13d5a5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -717,6 +717,8 @@ public class SCMNodeManager implements NodeManager {
datanodeInfo.setCommandCounts(commandQueueReportProto,
commandsToBeSent);
metrics.incNumNodeCommandQueueReportProcessed();
+ scmNodeEventPublisher.fireEvent(
+ SCMEvents.DATANODE_COMMAND_COUNT_UPDATED, datanodeDetails);
}
} catch (NodeNotFoundException e) {
metrics.incNumNodeCommandQueueReportProcessingFailed();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a3b748e547..0f84b624b1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
+import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
@@ -466,12 +467,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new PipelineActionHandler(pipelineManager, scmContext, configuration);
CRLStatusReportHandler crlStatusReportHandler =
new CRLStatusReportHandler(certificateStore, configuration);
+ DatanodeCommandCountUpdatedHandler datanodeCommandCountUpdatedHandler =
+ new DatanodeCommandCountUpdatedHandler(replicationManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.COMMAND_QUEUE_REPORT,
commandQueueReportHandler);
+ eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED,
+ datanodeCommandCountUpdatedHandler);
// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java
new file mode 100644
index 0000000000..9fb9d1f692
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * Tests for DatanodeCommandCountUpdatedHandler.
+ */
+public class TestDatanodeCommandCountUpdatedHandler {
+
+ private ReplicationManager replicationManager;
+ private DatanodeCommandCountUpdatedHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ handler = new DatanodeCommandCountUpdatedHandler(replicationManager);
+ }
+
+ @Test
+ public void testReplicationManagerNotified() {
+ DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
+ handler.onMessage(datanode, null);
+ Mockito.verify(replicationManager)
+ .datanodeCommandCountUpdated(eq(datanode));
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 73dc7b6175..72c2873f72 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -106,6 +106,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_C
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND_COUNT_UPDATED;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NEW_NODE;
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -986,6 +987,13 @@ public class TestSCMNodeManager {
assertEquals(5, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.deleteBlocksCommand));
+ ArgumentCaptor<DatanodeDetails> captor =
+ ArgumentCaptor.forClass(DatanodeDetails.class);
+ verify(eventPublisher, times(1))
+ .fireEvent(Mockito.eq(DATANODE_COMMAND_COUNT_UPDATED),
+ captor.capture());
+ assertEquals(node1, captor.getValue());
+
// Send another report missing an earlier entry, and ensure it is not
// still reported as a stale value.
nodeManager.processNodeCommandQueueReport(node1,
@@ -998,6 +1006,11 @@ public class TestSCMNodeManager {
node1, SCMCommandProto.Type.replicateContainerCommand));
assertEquals(11, nodeManager.getNodeQueuedCommandCount(
node1, SCMCommandProto.Type.closeContainerCommand));
+
+ verify(eventPublisher, times(2))
+ .fireEvent(Mockito.eq(DATANODE_COMMAND_COUNT_UPDATED),
+ captor.capture());
+ assertEquals(node1, captor.getValue());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org