You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/11/16 09:05:21 UTC
[ozone] branch master updated: HDDS-7255. Add metrics for container reports events (#3936)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b46f961b59 HDDS-7255. Add metrics for container reports events (#3936)
b46f961b59 is described below
commit b46f961b59a994ccab89a7806d815e53e5def8c0
Author: Sumit Agrawal <su...@gmail.com>
AuthorDate: Wed Nov 16 14:35:15 2022 +0530
HDDS-7255. Add metrics for container reports events (#3936)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 11 ++++-
.../hadoop/hdds/server/events/EventExecutor.java | 14 ++++++
.../FixedThreadPoolWithAffinityExecutor.java | 52 ++++++++++++++++++++++
.../hadoop/hdds/server/events/IEventInfo.java | 28 ++++++++++++
.../java/org/apache/hadoop/hdds/scm/ScmUtils.java | 17 +++----
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 32 +++++++++++--
.../hdds/scm/server/StorageContainerManager.java | 25 ++++++++---
.../hadoop/ozone/TestStorageContainerManager.java | 51 +++++++++++++++++++++
.../scm/ReconStorageContainerManagerFacade.java | 25 ++++++++---
9 files changed, 233 insertions(+), 22 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index c95e4801bc..e5562267d3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -490,7 +490,16 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_EVENT_CONTAINER_REPORT_THREAD_POOL_SIZE =
OZONE_SCM_EVENT_PREFIX + "ContainerReport.thread.pool.size";
public static final int OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT = 10;
-
+ /**
+ SCM Event Report queue default queue wait time in millisec, i.e. 1 minute.
+ */
+ public static final int OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT
+ = 60000;
+ /**
+ SCM Event Report queue execution time wait in millisec, i.e. 2 minute.
+ */
+ public static final int OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT
+ = 120000;
public static final int OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT
= 100000;
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
index d400c9fa15..c79d0d371c 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
@@ -70,6 +70,20 @@ public interface EventExecutor<PAYLOAD> extends AutoCloseable {
return 0;
}
+ /**
+ * Return the number of events having long wait in queue crossing threshold.
+ */
+ default long longWaitInQueueEvents() {
+ return 0;
+ }
+
+ /**
+ * Return the number of events having long execution crossing threshold.
+ */
+ default long longTimeExecutionEvents() {
+ return 0;
+ }
+
/**
* The human readable name for the event executor.
* <p>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index d59367f1ba..4949195cdd 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
+
/**
* Fixed thread pool EventExecutor to call all the event handler one-by-one.
* Payloads with the same hashcode will be mapped to the same thread.
@@ -77,8 +81,18 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
@Metric
private MutableCounterLong dropped;
+
+ @Metric
+ private MutableCounterLong longWaitInQueue;
+
+ @Metric
+ private MutableCounterLong longTimeExecution;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ private long queueWaitThreshold
+ = OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
+ private long execWaitThreshold
+ = OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
/**
* Create FixedThreadPoolExecutor with affinity.
@@ -117,6 +131,14 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
"Event Executor metrics ",
this);
}
+
+ public void setQueueWaitThreshold(long queueWaitThreshold) {
+ this.queueWaitThreshold = queueWaitThreshold;
+ }
+
+ public void setExecWaitThreshold(long execWaitThreshold) {
+ this.execWaitThreshold = execWaitThreshold;
+ }
public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(
List<BlockingQueue<Q>> workQueues) {
@@ -180,6 +202,14 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
return dropped.value();
}
+ public long longWaitInQueueEvents() {
+ return longWaitInQueue.value();
+ }
+
+ public long longTimeExecutionEvents() {
+ return longTimeExecution.value();
+ }
+
@Override
public void close() {
isRunning.set(false);
@@ -226,12 +256,34 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
LOG.warn("Executor for report is not found");
continue;
}
+
+ long createTime = 0;
+ String eventId = "";
+ if (report instanceof IEventInfo) {
+ createTime = ((IEventInfo) report).getCreateTime();
+ eventId = ((IEventInfo) report).getEventId();
+ }
+
+ long curTime = Time.monotonicNow();
+ if (createTime != 0
+ && ((curTime - createTime) > executor.queueWaitThreshold)) {
+ executor.longWaitInQueue.incr();
+ LOG.warn("Event remained in queue for long time {} millisec, {}",
+ (curTime - createTime), eventId);
+ }
executor.scheduled.incr();
try {
executor.eventHandler.onMessage(report,
executor.eventPublisher);
executor.done.incr();
+ curTime = Time.monotonicNow();
+ if (createTime != 0
+ && (curTime - createTime) > executor.execWaitThreshold) {
+ executor.longTimeExecution.incr();
+ LOG.warn("Event taken long execution time {} millisec, {}",
+ (curTime - createTime), eventId);
+ }
} catch (Exception ex) {
LOG.error("Error on execution message {}", report, ex);
executor.failed.incr();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java
new file mode 100644
index 0000000000..1a545cbebd
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IEventInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Get various information of event fired.
+ */
+public interface IEventInfo {
+ long getCreateTime();
+ default String getEventId() {
+ return "";
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index 597242b5f7..152e8d38b5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -205,16 +205,10 @@ public final class ScmUtils {
@NotNull
public static List<BlockingQueue<ContainerReport>> initContainerReportQueue(
OzoneConfiguration configuration) {
- int threadPoolSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
- StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
- + "_OR_"
- + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+ int threadPoolSize = configuration.getInt(getContainerReportConfPrefix()
+ ".thread.pool.size",
OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
- int queueSize = configuration.getInt(OZONE_SCM_EVENT_PREFIX +
- StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
- + "_OR_"
- + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName())
+ int queueSize = configuration.getInt(getContainerReportConfPrefix()
+ ".queue.size",
OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
List<BlockingQueue<ContainerReport>> queues = new ArrayList<>();
@@ -223,5 +217,12 @@ public final class ScmUtils {
}
return queues;
}
+
+ public static String getContainerReportConfPrefix() {
+ return OZONE_SCM_EVENT_PREFIX +
+ StringUtils.camelize(SCMEvents.CONTAINER_REPORT.getName()
+ + "_OR_"
+ + SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName());
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index df08bc83d0..02e1b3fc61 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -42,10 +42,12 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.IEventInfo;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.protobuf.Message;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -310,7 +312,8 @@ public final class SCMDatanodeHeartbeatDispatcher {
*/
public static class ContainerReportFromDatanode
extends ReportFromDatanode<ContainerReportsProto>
- implements ContainerReport {
+ implements ContainerReport, IEventInfo {
+ private long createTime = Time.monotonicNow();
public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
ContainerReportsProto report) {
@@ -330,6 +333,17 @@ public final class SCMDatanodeHeartbeatDispatcher {
public ContainerReportType getType() {
return ContainerReportType.FCR;
}
+
+ @Override
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ @Override
+ public String getEventId() {
+ return getDatanodeDetails().toString() + ", {type: " + getType()
+ + ", size: " + getReport().getReportsList().size() + "}";
+ }
}
/**
@@ -337,8 +351,9 @@ public final class SCMDatanodeHeartbeatDispatcher {
*/
public static class IncrementalContainerReportFromDatanode
extends ReportFromDatanode<IncrementalContainerReportProto>
- implements ContainerReport {
-
+ implements ContainerReport, IEventInfo {
+ private long createTime = Time.monotonicNow();
+
public IncrementalContainerReportFromDatanode(
DatanodeDetails datanodeDetails,
IncrementalContainerReportProto report) {
@@ -358,6 +373,17 @@ public final class SCMDatanodeHeartbeatDispatcher {
public ContainerReportType getType() {
return ContainerReportType.ICR;
}
+
+ @Override
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ @Override
+ public String getEventId() {
+ return getDatanodeDetails().toString() + ", {type: " + getType()
+ + ", size: " + getReport().getReportList().size() + "}";
+ }
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0f84b624b1..3687763984 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -80,7 +80,6 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateCli
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.server.events.EventExecutor;
import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
import org.apache.hadoop.hdds.utils.HAUtils;
@@ -185,6 +184,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
@@ -481,22 +482,34 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
// Dispatcher should always dispatch FCR first followed by ICR
+ // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+ // .queue.wait.threshold
+ long waitQueueThreshold = configuration.getInt(
+ ScmUtils.getContainerReportConfPrefix() + ".queue.wait.threshold",
+ OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT);
+ // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+ // .execute.wait.threshold
+ long execWaitThreshold = configuration.getInt(
+ ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
+ OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
List<BlockingQueue<ContainerReport>> queues
= ScmUtils.initContainerReportQueue(configuration);
List<ThreadPoolExecutor> executors
= FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues);
Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
= new ConcurrentHashMap<>();
- EventExecutor<ContainerReportFromDatanode>
- containerReportExecutors =
+ FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+ ContainerReport> containerReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
containerReportHandler),
containerReportHandler, queues, eventQueue,
ContainerReportFromDatanode.class, executors,
reportExecutorMap);
- EventExecutor<IncrementalContainerReportFromDatanode>
- incrementalReportExecutors =
+ containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+ containerReportExecutors.setExecWaitThreshold(execWaitThreshold);
+ FixedThreadPoolWithAffinityExecutor<IncrementalContainerReportFromDatanode,
+ ContainerReport> incrementalReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(
SCMEvents.INCREMENTAL_CONTAINER_REPORT,
@@ -504,6 +517,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
incrementalContainerReportHandler, queues, eventQueue,
IncrementalContainerReportFromDatanode.class, executors,
reportExecutorMap);
+ incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+ incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
containerReportHandler);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 49a91ca93a..19c570cd0f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -889,6 +889,57 @@ public class TestStorageContainerManager {
containerReportExecutors.close();
}
+ @Test
+ public void testContainerReportQueueTakingMoreTime() throws Exception {
+ EventQueue eventQueue = new EventQueue();
+ List<BlockingQueue<SCMDatanodeHeartbeatDispatcher.ContainerReport>>
+ queues = new ArrayList<>();
+ for (int i = 0; i < 1; ++i) {
+ queues.add(new ContainerReportQueue());
+ }
+
+ ContainerReportHandler containerReportHandler =
+ Mockito.mock(ContainerReportHandler.class);
+ Mockito.doAnswer((inv) -> {
+ Thread.currentThread().sleep(1000);
+ return null;
+ }).when(containerReportHandler).onMessage(Mockito.any(),
+ Mockito.eq(eventQueue));
+ List<ThreadPoolExecutor> executors = FixedThreadPoolWithAffinityExecutor
+ .initializeExecutorPool(queues);
+ Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
+ = new ConcurrentHashMap<>();
+ FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+ SCMDatanodeHeartbeatDispatcher.ContainerReport>
+ containerReportExecutors =
+ new FixedThreadPoolWithAffinityExecutor<>(
+ EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
+ containerReportHandler),
+ containerReportHandler, queues, eventQueue,
+ ContainerReportFromDatanode.class, executors,
+ reportExecutorMap);
+ containerReportExecutors.setQueueWaitThreshold(1000);
+ containerReportExecutors.setExecWaitThreshold(1000);
+
+ eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
+ containerReportHandler);
+ ContainerReportsProto report = ContainerReportsProto.getDefaultInstance();
+ DatanodeDetails dn = DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+ .build();
+ ContainerReportFromDatanode dndata1
+ = new ContainerReportFromDatanode(dn, report);
+ eventQueue.fireEvent(SCMEvents.CONTAINER_REPORT, dndata1);
+ dn = DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+ .build();
+ ContainerReportFromDatanode dndata2
+ = new ContainerReportFromDatanode(dn, report);
+ eventQueue.fireEvent(SCMEvents.CONTAINER_REPORT, dndata2);
+ Thread.currentThread().sleep(3000);
+ Assert.assertTrue(containerReportExecutors.longWaitInQueueEvents() >= 1);
+ Assert.assertTrue(containerReportExecutors.longTimeExecutionEvents() >= 1);
+ containerReportExecutors.close();
+ }
+
@Test
public void testIncrementalContainerReportQueue() throws Exception {
EventQueue eventQueue = new EventQueue();
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index f9e81e76d5..51499a0d6c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
-import org.apache.hadoop.hdds.server.events.EventExecutor;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -90,6 +89,8 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import com.google.inject.Inject;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
@@ -216,22 +217,34 @@ public class ReconStorageContainerManagerFacade
// Use the same executor for both ICR and FCR.
// The Executor maps the event to a thread for DN.
// Dispatcher should always dispatch FCR first followed by ICR
+ // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+ // .queue.wait.threshold
+ long waitQueueThreshold = ozoneConfiguration.getInt(
+ ScmUtils.getContainerReportConfPrefix() + ".queue.wait.threshold",
+ OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT);
+ // conf: ozone.scm.event.CONTAINER_REPORT_OR_INCREMENTAL_CONTAINER_REPORT
+ // .execute.wait.threshold
+ long execWaitThreshold = ozoneConfiguration.getInt(
+ ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
+ OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
List<BlockingQueue<ContainerReport>> queues
= ScmUtils.initContainerReportQueue(ozoneConfiguration);
List<ThreadPoolExecutor> executors
= FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(queues);
Map<String, FixedThreadPoolWithAffinityExecutor> reportExecutorMap
= new ConcurrentHashMap<>();
- EventExecutor<ContainerReportFromDatanode>
- containerReportExecutors =
+ FixedThreadPoolWithAffinityExecutor<ContainerReportFromDatanode,
+ ContainerReport> containerReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(SCMEvents.CONTAINER_REPORT,
containerReportHandler),
containerReportHandler, queues, eventQueue,
ContainerReportFromDatanode.class, executors,
reportExecutorMap);
- EventExecutor<IncrementalContainerReportFromDatanode>
- incrementalReportExecutors =
+ containerReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+ containerReportExecutors.setExecWaitThreshold(execWaitThreshold);
+ FixedThreadPoolWithAffinityExecutor<IncrementalContainerReportFromDatanode,
+ ContainerReport> incrementalReportExecutors =
new FixedThreadPoolWithAffinityExecutor<>(
EventQueue.getExecutorName(
SCMEvents.INCREMENTAL_CONTAINER_REPORT,
@@ -239,6 +252,8 @@ public class ReconStorageContainerManagerFacade
icrHandler, queues, eventQueue,
IncrementalContainerReportFromDatanode.class, executors,
reportExecutorMap);
+ incrementalReportExecutors.setQueueWaitThreshold(waitQueueThreshold);
+ incrementalReportExecutors.setExecWaitThreshold(execWaitThreshold);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportExecutors,
containerReportHandler);
eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org