You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/11/16 09:05:21 UTC

[ozone] branch master updated: HDDS-7255. Add metrics for container reports events (#3936)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b46f961b59 HDDS-7255. Add metrics for container reports events (#3936)
b46f961b59 is described below

commit b46f961b59a994ccab89a7806d815e53e5def8c0
Author: Sumit Agrawal <su...@gmail.com>
AuthorDate: Wed Nov 16 14:35:15 2022 +0530

    HDDS-7255. Add metrics for container reports events (#3936)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  | 11 ++++-
 .../hadoop/hdds/server/events/EventExecutor.java   | 14 ++++++
 .../FixedThreadPoolWithAffinityExecutor.java       | 52 ++++++++++++++++++++++
 .../hadoop/hdds/server/events/IEventInfo.java      | 28 ++++++++++++
 .../java/org/apache/hadoop/hdds/scm/ScmUtils.java  | 17 +++----
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 32 +++++++++++--
 .../hdds/scm/server/StorageContainerManager.java   | 25 ++++++++---
 .../hadoop/ozone/TestStorageContainerManager.java  | 51 +++++++++++++++++++++
 .../scm/ReconStorageContainerManagerFacade.java    | 25 ++++++++---
 9 files changed, 233 insertions(+), 22 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index c95e4801bc..e5562267d3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -490,7 +490,16 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_EVENT_CONTAINER_REPORT_THREAD_POOL_SIZE =
       OZONE_SCM_EVENT_PREFIX + "ContainerReport.thread.pool.size";
   public static final int OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT = 10;
-
+  /**
+  SCM Event Report queue default queue wait time in millisec, i.e. 1 minute.
+   */
+  public static final int OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT
+      = 60000;
+  /**
+  SCM Event Report queue execution time wait in millisec, i.e. 2 minute.
+   */
+  public static final int OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT
+      = 120000;
   public static final int OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT 
       = 100000;
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
index d400c9fa15..c79d0d371c 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
@@ -70,6 +70,20 @@ public interface EventExecutor<PAYLOAD> extends AutoCloseable {
     return 0;
   }
 
+  /**
+   * Return the number of events having long wait in queue crossing threshold.
+   */
+  default long longWaitInQueueEvents() {
+    return 0;
+  }
+
+  /**
+   * Return the number of events having long execution crossing threshold.
+   */
+  default long longTimeExecutionEvents() {
+    return 0;
+  }
+  
   /**
    * The human readable name for the event executor.
    * <p>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index d59367f1ba..4949195cdd 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
+
 /**
  * Fixed thread pool EventExecutor to call all the event handler one-by-one.
  * Payloads with the same hashcode will be mapped to the same thread.
@@ -77,8 +81,18 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
 
   @Metric
   private MutableCounterLong dropped;
+  
+  @Metric
+  private MutableCounterLong longWaitInQueue;
+  
+  @Metric
+  private MutableCounterLong longTimeExecution;
 
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
+  private long queueWaitThreshold
+      = OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
+  private long execWaitThreshold
+      = OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
 
   /**
    * Create FixedThreadPoolExecutor with affinity.
@@ -117,6 +131,14 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
             "Event Executor metrics ",
             this);
   }
+  
+  public void setQueueWaitThreshold(long queueWaitThreshold) {
+    this.queueWaitThreshold = queueWaitThreshold;
+  }
+
+  public void setExecWaitThreshold(long execWaitThreshold) {
+    this.execWaitThreshold = execWaitThreshold;
+  }
 
   public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(
       List<BlockingQueue<Q>> workQueues) {
@@ -180,6 +202,14 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
     return dropped.value();
   }
 
+  public long longWaitInQueueEvents() {
+    return longWaitInQueue.value();
+  }
+  
+  public long longTimeExecutionEvents() {
+    return longTimeExecution.value();
+  }
+
   @Override
   public void close() {
     isRunning.set(false);
@@ -226,12 +256,34 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
             LOG.warn("Executor for report is not found");
             continue;
           }
+          
+          long createTime = 0;
+          String eventId = "";
+          if (report instanceof IEventInfo) {
+            createTime = ((IEventInfo) report).getCreateTime();
+            eventId = ((IEventInfo) report).getEventId();
+          }
+          
+          long curTime = Time.monotonicNow();
+          if (createTime != 0
+              && ((curTime - createTime) > executor.queueWaitThreshold)) {
+            executor.longWaitInQueue.incr();
+            LOG.warn("Event remained in queue for long time {} millisec, {}",
+                (curTime - createTime), eventId);
+          }
 
           executor.scheduled.incr();
           try {
             executor.eventHandler.onMessage(report,
                 executor.eventPublisher);
             executor.done.incr();
+            curTime = Time.monotonicNow();
+            if (createTime != 0
+                && (curTime - createTime) > executor.execWaitThreshold) {
+              executor.longTimeExecution.incr();
+              LOG.warn("Event taken long execution time {} millisec, {}",
+                  (curTime - createTime), eventId);
+            }
           } catch (Exception ex) {
             LOG.error("Error on execution message {}", report, ex);
             executor.failed.incr();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java
new file mode 100644
index 0000000000..1a545cbebd
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Get various information of event fired.
+ */
+public interface IEventInfo {
+  long getCreateTime();
+  default String getEventId() {
+    return "";
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index 597242b5f7..152e8d38b5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -205,16 +205,10 @@ public final class ScmUtils {
   @NotNull
   public static List<BlockingQueue<ContainerReport>> initContainerReportQueue(
       OzoneConfiguration configuration) {
-    int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
-            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
-                + "_OR_"
-                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+    int threadPoolSize = configuration.getInt(getContainerReportConfPrefix()
             + ".thread.pool.size",
         OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
-    int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
-            StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
-                + "_OR_"
-                + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+    int queueSize = configuration.getInt(getContainerReportConfPrefix()
             + ".queue.size",
         OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
     List<BlockingQueue<ContainerReport>> queues = new ArrayList<>();
@@ -223,5 +217,12 @@ public final class ScmUtils {
     }
     return queues;
   }
+  
+  public static String getContainerReportConfPrefix() {
+    return OZONE_SCM_EVENT_PREFIX +
+        StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+            + "_OR_"
+            + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName());
+  }
 
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index df08bc83d0..02e1b3fc61 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -42,10 +42,12 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.IEventInfo;
 import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import com.google.protobuf.Message;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -310,7 +312,8 @@ public final class SCMDatanodeHeartbeatDispatcher {
    */
   public static class ContainerReportFromDatanode
       extends ReportFromDatanode<ContainerReportsProto>
-      implements ContainerReport {
+      implements ContainerReport, IEventInfo {
+    private long createTime = Time.monotonicNow();
 
     public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
         ContainerReportsProto report) {
@@ -330,6 +333,17 @@ public final class SCMDatanodeHeartbeatDispatcher {
     public ContainerReportType getType() {
       return ContainerReportType.FCR;
     }
+    
+    @Override
+    public long getCreateTime() {
+      return createTime;
+    }
+
+    @Override
+    public String getEventId() {
+      return getDatanodeDetails().toString() + ", {type: " + getType()
+          + ", size: " + getReport().getReportsList().size() + "}";
+    }
   }
 
   /**
@@ -337,8 +351,9 @@ public final class SCMDatanodeHeartbeatDispatcher {
    */
   public static class IncrementalContainerReportFromDatanode
       extends ReportFromDatanode<IncrementalContainerReportProto>
-      implements ContainerReport {
-
+      implements ContainerReport, IEventInfo {
+    private long createTime = Time.monotonicNow();
+    
     public IncrementalContainerReportFromDatanode(
         DatanodeDetails datanodeDetails,
         IncrementalContainerReportProto report) {
@@ -358,6 +373,17 @@ public final class SCMDatanodeHeartbeatDispatcher {
     public ContainerReportType getType() {
       return ContainerReportType.ICR;
     }
+
+    @Override
+    public long getCreateTime() {
+      return createTime;
+    }
+    
+    @Override
+    public String getEventId() {
+      return getDatanodeDetails().toString() + ", {type: " + getType()
+          + ", size: " + getReport().getReportList().size() + "}";
+    }
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0f84b624b1..3687763984 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -80,7 +80,6 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateCli
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.server.events.EventExecutor;
 import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
 import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
 import org.apache.hadoop.hdds.utils.HAUtils;
@@ -185,6 +184,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
 import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
@@ -481,22 +482,34 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     // Use the same executor for both ICR and FCR.
     // The Executor maps the event to a thread for DN.
     // Dispatcher should always dispatch FCR first followed by ICR
+    // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+    // .queue.wait.threshold
+    long waitQueueThreshold = configuration.getInt(
+        ScmUtils.getContainerReportConfPrefix() + ".queue.wait.threshold",
+        OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT);
+    // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+    // .execute.wait.threshold
+    long execWaitThreshold = configuration.getInt(
+        ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
+        OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
     List<BlockingQueue<ContainerReport>> queues
         = ScmUtils.initContainerReportQueue(configuration);
     List<ThreadPoolExecutor> executors
         = FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues);
     Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
         = new ConcurrentHashMap<>();
-    EventExecutor<ContainerReportFromDatanode>
-        containerReportExecutors =
+    FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+        ContainerReport> containerReportExecutors =
         new FixedThreadPoolWithAffinityExecutor<>(
             EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
                 containerReportHandler),
             containerReportHandler, queues, eventQueue,
             ContainerReportFromDatanode.class, executors,
             reportExecutorMap);
-    EventExecutor<IncrementalContainerReportFromDatanode>
-        incrementalReportExecutors =
+    containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+    containerReportExecutors.setExecWaitThreshold(execWaitThreshold);
+    FixedThreadPoolWithAffinityExecutor<IncrementalContainerReportFromDatanode,
+        ContainerReport> incrementalReportExecutors =
         new FixedThreadPoolWithAffinityExecutor<>(
             EventQueue.getExecutorName(
                 SCMEvents.INCREMENTAL_CONTAINER_REPORT,
@@ -504,6 +517,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
             incrementalContainerReportHandler, queues, eventQueue,
             IncrementalContainerReportFromDatanode.class, executors,
             reportExecutorMap);
+    incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+    incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold);
 
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
         containerReportHandler);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 49a91ca93a..19c570cd0f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -889,6 +889,57 @@ public class TestStorageContainerManager {
     containerReportExecutors.close();
   }
 
+  @Test
+  public void testContainerReportQueueTakingMoreTime() throws Exception {
+    EventQueue eventQueue = new EventQueue();
+    List<BlockingQueue<SCMDatanodeHeartbeatDispatcher.ContainerReport>>
+        queues = new ArrayList<>();
+    for (int i = 0; i < 1; ++i) {
+      queues.add(new ContainerReportQueue());
+    }
+
+    ContainerReportHandler containerReportHandler =
+        Mockito.mock(ContainerReportHandler.class);
+    Mockito.doAnswer((inv) -> {
+      Thread.currentThread().sleep(1000);
+      return null;
+    }).when(containerReportHandler).onMessage(Mockito.any(),
+        Mockito.eq(eventQueue));
+    List<ThreadPoolExecutor> executors = FixedThreadPoolWithAffinityExecutor
+        .initializeExecutorPool(queues);
+    Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
+        = new ConcurrentHashMap<>();
+    FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+        SCMDatanodeHeartbeatDispatcher.ContainerReport>
+        containerReportExecutors =
+        new FixedThreadPoolWithAffinityExecutor<>(
+            EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
+                containerReportHandler),
+            containerReportHandler, queues, eventQueue,
+            ContainerReportFromDatanode.class, executors,
+            reportExecutorMap);
+    containerReportExecutors.setQueueWaitThreshold(1000);
+    containerReportExecutors.setExecWaitThreshold(1000);
+    
+    eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
+        containerReportHandler);
+    ContainerReportsProto report = ContainerReportsProto.getDefaultInstance();
+    DatanodeDetails dn = DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .build();
+    ContainerReportFromDatanode dndata1
+        = new ContainerReportFromDatanode(dn, report);
+    eventQueue.fireEvent(SCMEvents.CONTAINER_REPORT, dndata1);
+    dn = DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .build();
+    ContainerReportFromDatanode dndata2
+        = new ContainerReportFromDatanode(dn, report);
+    eventQueue.fireEvent(SCMEvents.CONTAINER_REPORT, dndata2);
+    Thread.currentThread().sleep(3000);
+    Assert.assertTrue(containerReportExecutors.longWaitInQueueEvents() >= 1);
+    Assert.assertTrue(containerReportExecutors.longTimeExecutionEvents() >= 1);
+    containerReportExecutors.close();
+  }
+
   @Test
   public void testIncrementalContainerReportQueue() throws Exception {
     EventQueue eventQueue = new EventQueue();
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index f9e81e76d5..51499a0d6c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
-import org.apache.hadoop.hdds.server.events.EventExecutor;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -90,6 +89,8 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import com.google.inject.Inject;
 import static org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
@@ -216,22 +217,34 @@ public class ReconStorageContainerManagerFacade
     // Use the same executor for both ICR and FCR.
     // The Executor maps the event to a thread for DN.
     // Dispatcher should always dispatch FCR first followed by ICR
+    // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+    // .queue.wait.threshold
+    long waitQueueThreshold = ozoneConfiguration.getInt(
+        ScmUtils.getContainerReportConfPrefix() + ".queue.wait.threshold",
+        OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT);
+    // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+    // .execute.wait.threshold
+    long execWaitThreshold = ozoneConfiguration.getInt(
+        ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
+        OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
     List<BlockingQueue<ContainerReport>> queues
         = ScmUtils.initContainerReportQueue(ozoneConfiguration);
     List<ThreadPoolExecutor> executors
         = FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues);
     Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
         = new ConcurrentHashMap<>();
-    EventExecutor<ContainerReportFromDatanode>
-        containerReportExecutors =
+    FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+        ContainerReport> containerReportExecutors =
         new FixedThreadPoolWithAffinityExecutor<>(
             EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
                 containerReportHandler),
             containerReportHandler, queues, eventQueue,
             ContainerReportFromDatanode.class, executors,
             reportExecutorMap);
-    EventExecutor<IncrementalContainerReportFromDatanode>
-        incrementalReportExecutors =
+    containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+    containerReportExecutors.setExecWaitThreshold(execWaitThreshold);
+    FixedThreadPoolWithAffinityExecutor<IncrementalContainerReportFromDatanode,
+        ContainerReport> incrementalReportExecutors =
         new FixedThreadPoolWithAffinityExecutor<>(
             EventQueue.getExecutorName(
                 SCMEvents.INCREMENTAL_CONTAINER_REPORT,
@@ -239,6 +252,8 @@ public class ReconStorageContainerManagerFacade
             icrHandler, queues, eventQueue,
             IncrementalContainerReportFromDatanode.class, executors,
             reportExecutorMap);
+    incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+    incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold);
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
         containerReportHandler);
     eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org