You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/17 22:40:52 UTC

[nifi] branch main updated: NIFI-8314: Add controller-level bulletin message for long-running tasks.

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 105a76b  NIFI-8314: Add controller-level bulletin message for long-running tasks.
105a76b is described below

commit 105a76b7b7d665335d53111817867e9ab53b957c
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Mar 17 13:48:56 2021 +0100

    NIFI-8314: Add controller-level bulletin message for long-running tasks.
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4906.
---
 .../components/monitor/LongRunningTaskMonitor.java   | 20 ++++++++++++++------
 .../org/apache/nifi/controller/FlowController.java   |  3 ++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
index eaf3526..27e4886 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -20,6 +20,8 @@ import org.apache.nifi.controller.ActiveThreadInfo;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ThreadDetails;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,10 +32,12 @@ public class LongRunningTaskMonitor implements Runnable {
     private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTaskMonitor.class);
 
     private final FlowManager flowManager;
+    private final EventReporter eventReporter;
     private final long thresholdMillis;
 
-    public LongRunningTaskMonitor(FlowManager flowManager, long thresholdMillis) {
+    public LongRunningTaskMonitor(FlowManager flowManager, EventReporter eventReporter, long thresholdMillis) {
         this.flowManager = flowManager;
+        this.eventReporter = eventReporter;
         this.thresholdMillis = thresholdMillis;
     }
 
@@ -54,12 +58,16 @@ public class LongRunningTaskMonitor implements Runnable {
                 if (activeThread.getActiveMillis() > thresholdMillis) {
                     longRunningThreadCount++;
 
-                    LOGGER.warn(String.format("Long running task detected on processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack trace:\n%s",
-                            processorNode.getIdentifier(), processorNode.getComponentType(), processorNode.getName(),
-                            activeThread.getThreadName(), activeThread.getActiveMillis(), activeThread.getStackTrace()));
+                    String taskSeconds = String.format("%,d seconds", activeThread.getActiveMillis() / 1000);
 
-                    processorNode.getLogger().warn(String.format("Long running task detected on the processor [thread name: %s; active time: %,d].",
-                            activeThread.getThreadName(), activeThread.getActiveMillis()));
+                    LOGGER.warn(String.format("Long running task detected on processor [id=%s, name=%s, type=%s]. Task time: %s. Stack trace:\n%s",
+                            processorNode.getIdentifier(), processorNode.getName(), processorNode.getComponentType(), taskSeconds, activeThread.getStackTrace()));
+
+                    eventReporter.reportEvent(Severity.WARNING, "Long Running Task", String.format("Processor with ID %s, Name %s and Type %s has a task that has been running for %s " +
+                            "(thread name: %s).", processorNode.getIdentifier(), processorNode.getName(), processorNode.getComponentType(), taskSeconds, activeThread.getThreadName()));
+
+                    processorNode.getLogger().warn(String.format("The processor has a task that has been running for %s (thread name: %s).",
+                            taskSeconds, activeThread.getThreadName()));
                 }
             }
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d724550..46610ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1107,7 +1107,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
         final long thresholdMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
 
-        longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
+        LongRunningTaskMonitor longRunningTaskMonitor = new LongRunningTaskMonitor(getFlowManager(), createEventReporter(), thresholdMillis);
+        longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(longRunningTaskMonitor, scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
     }
 
     private long parseDurationPropertyToMillis(String propertyName, String defaultValue) {