You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/17 22:40:52 UTC
[nifi] branch main updated: NIFI-8314: Add controller-level
bulletin message for long-running tasks.
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 105a76b NIFI-8314: Add controller-level bulletin message for long-running tasks.
105a76b is described below
commit 105a76b7b7d665335d53111817867e9ab53b957c
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Mar 17 13:48:56 2021 +0100
NIFI-8314: Add controller-level bulletin message for long-running tasks.
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4906.
---
.../components/monitor/LongRunningTaskMonitor.java | 20 ++++++++++++++------
.../org/apache/nifi/controller/FlowController.java | 3 ++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
index eaf3526..27e4886 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -20,6 +20,8 @@ import org.apache.nifi.controller.ActiveThreadInfo;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ThreadDetails;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,10 +32,12 @@ public class LongRunningTaskMonitor implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTaskMonitor.class);
private final FlowManager flowManager;
+ private final EventReporter eventReporter;
private final long thresholdMillis;
- public LongRunningTaskMonitor(FlowManager flowManager, long thresholdMillis) {
+ public LongRunningTaskMonitor(FlowManager flowManager, EventReporter eventReporter, long thresholdMillis) {
this.flowManager = flowManager;
+ this.eventReporter = eventReporter;
this.thresholdMillis = thresholdMillis;
}
@@ -54,12 +58,16 @@ public class LongRunningTaskMonitor implements Runnable {
if (activeThread.getActiveMillis() > thresholdMillis) {
longRunningThreadCount++;
- LOGGER.warn(String.format("Long running task detected on processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack trace:\n%s",
- processorNode.getIdentifier(), processorNode.getComponentType(), processorNode.getName(),
- activeThread.getThreadName(), activeThread.getActiveMillis(), activeThread.getStackTrace()));
+ String taskSeconds = String.format("%,d seconds", activeThread.getActiveMillis() / 1000);
- processorNode.getLogger().warn(String.format("Long running task detected on the processor [thread name: %s; active time: %,d].",
- activeThread.getThreadName(), activeThread.getActiveMillis()));
+ LOGGER.warn(String.format("Long running task detected on processor [id=%s, name=%s, type=%s]. Task time: %s. Stack trace:\n%s",
+ processorNode.getIdentifier(), processorNode.getName(), processorNode.getComponentType(), taskSeconds, activeThread.getStackTrace()));
+
+ eventReporter.reportEvent(Severity.WARNING, "Long Running Task", String.format("Processor with ID %s, Name %s and Type %s has a task that has been running for %s " +
+ "(thread name: %s).", processorNode.getIdentifier(), processorNode.getName(), processorNode.getComponentType(), taskSeconds, activeThread.getThreadName()));
+
+ processorNode.getLogger().warn(String.format("The processor has a task that has been running for %s (thread name: %s).",
+ taskSeconds, activeThread.getThreadName()));
}
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d724550..46610ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1107,7 +1107,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
final long thresholdMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
- longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
+ LongRunningTaskMonitor longRunningTaskMonitor = new LongRunningTaskMonitor(getFlowManager(), createEventReporter(), thresholdMillis);
+ longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(longRunningTaskMonitor, scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
}
private long parseDurationPropertyToMillis(String propertyName, String defaultValue) {