You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/06/30 10:15:00 UTC

[ozone] branch master updated: HDDS-6974. Container report processing in Recon is single threaded. (#3571)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ade19e2c3 HDDS-6974. Container report processing in Recon is single threaded. (#3571)
7ade19e2c3 is described below

commit 7ade19e2c34ceadb821b7acb705bc8ad267acee8
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Thu Jun 30 18:14:54 2022 +0800

    HDDS-6974. Container report processing in Recon is single threaded. (#3571)
---
 .../scm/ReconStorageContainerManagerFacade.java    | 33 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)

diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index b85a751614..09b91f6f44 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -62,7 +63,9 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.server.events.EventExecutor;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -85,6 +88,8 @@ import com.google.inject.Inject;
 import static org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
 
 import org.apache.ratis.util.ExitUtils;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
@@ -203,14 +208,38 @@ public class ReconStorageContainerManagerFacade
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
     ReconNewNodeHandler newNodeHandler = new ReconNewNodeHandler(nodeManager);
 
+    // Use the same executor for both ICR and FCR.
+    // The Executor maps the event to a thread for DN.
+    // Dispatcher should always dispatch FCR first followed by ICR
+    List<ThreadPoolExecutor> executors =
+        FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(
+            SCMEvents.CONTAINER_REPORT.getName()
+                + "_OR_"
+                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName());
+
+    EventExecutor<ContainerReportFromDatanode>
+        containerReportExecutors =
+        new FixedThreadPoolWithAffinityExecutor<>(
+            EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
+                containerReportHandler),
+            executors);
+    EventExecutor<IncrementalContainerReportFromDatanode>
+        incrementalReportExecutors =
+        new FixedThreadPoolWithAffinityExecutor<>(
+            EventQueue.getExecutorName(
+                SCMEvents.INCREMENTAL_CONTAINER_REPORT,
+                icrHandler),
+            executors);
+    eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
+        containerReportHandler);
+    eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,
+        incrementalReportExecutors, icrHandler);
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, nodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
-    eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
-    eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT, icrHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
     eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org