You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2021/03/16 18:37:08 UTC

[nifi] branch main updated: NIFI-8314: Generate warning for long-running processor tasks

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f00f0ad  NIFI-8314: Generate warning for long-running processor tasks
f00f0ad is described below

commit f00f0ad269c2fab8ce7b88fb6226fdf88ffce33a
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Tue Mar 16 17:56:50 2021 +0100

    NIFI-8314: Generate warning for long-running processor tasks
---
 .../java/org/apache/nifi/util/NiFiProperties.java  |  8 +++
 .../components/monitor/LongRunningTaskMonitor.java | 69 ++++++++++++++++++++++
 .../org/apache/nifi/controller/FlowController.java | 25 ++++++++
 .../nifi-framework/nifi-resources/pom.xml          |  4 ++
 .../src/main/resources/conf/nifi.properties        |  4 ++
 5 files changed, 110 insertions(+)

diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index b33b6f7..1bdf495 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -288,6 +288,10 @@ public abstract class NiFiProperties {
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name";
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
 
+    // runtime monitoring properties
+    public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule";
+    public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold";
+
     // defaults
     public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
     public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml";
@@ -371,6 +375,10 @@ public abstract class NiFiProperties {
     public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared";
     public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90;
 
+    // runtime monitoring defaults
+    public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE = "1 min";
+    public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD = "5 mins";
+
     // Status repository defaults
     public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_NODE_DAYS = 14;
     public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_COMPONENT_DAYS = 3;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
new file mode 100644
index 0000000..eaf3526
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components.monitor;
+
+import org.apache.nifi.controller.ActiveThreadInfo;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ThreadDetails;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class LongRunningTaskMonitor implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTaskMonitor.class);
+
+    private final FlowManager flowManager;
+    private final long thresholdMillis;
+
+    public LongRunningTaskMonitor(FlowManager flowManager, long thresholdMillis) {
+        this.flowManager = flowManager;
+        this.thresholdMillis = thresholdMillis;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Checking long running processor tasks...");
+
+        int activeThreadCount = 0;
+        int longRunningThreadCount = 0;
+
+        ThreadDetails threadDetails = ThreadDetails.capture();
+
+        for (ProcessorNode processorNode : flowManager.getRootGroup().findAllProcessors()) {
+            List<ActiveThreadInfo> activeThreads = processorNode.getActiveThreads(threadDetails);
+            activeThreadCount += activeThreads.size();
+
+            for (ActiveThreadInfo activeThread : activeThreads) {
+                if (activeThread.getActiveMillis() > thresholdMillis) {
+                    longRunningThreadCount++;
+
+                    LOGGER.warn(String.format("Long running task detected on processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack trace:\n%s",
+                            processorNode.getIdentifier(), processorNode.getComponentType(), processorNode.getName(),
+                            activeThread.getThreadName(), activeThread.getActiveMillis(), activeThread.getStackTrace()));
+
+                    processorNode.getLogger().warn(String.format("Long running task detected on the processor [thread name: %s; active time: %,d].",
+                            activeThread.getThreadName(), activeThread.getActiveMillis()));
+                }
+            }
+        }
+
+        LOGGER.info("Active threads: {}; Long running threads: {}", activeThreadCount, longRunningThreadCount);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 16d3347..d724550 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -40,6 +40,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.components.monitor.LongRunningTaskMonitor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.components.validation.StandardValidationTrigger;
 import org.apache.nifi.components.validation.TriggerValidationTask;
@@ -310,6 +311,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     private final StandardFlowManager flowManager;
     private final RepositoryContextFactory repositoryContextFactory;
     private final RingBufferGarbageCollectionLog gcLog;
+    private final FlowEngine longRunningTaskMonitorThreadPool;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -778,6 +780,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             loadBalanceServer = null;
             loadBalanceClientThreadPool = null;
         }
+
+        longRunningTaskMonitorThreadPool = new FlowEngine(1, "Long Running Task Monitor", true);
     }
 
     @Override
@@ -1092,11 +1096,32 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             for (final Connection connection : flowManager.findAllConnections()) {
                 connection.getFlowFileQueue().startLoadBalancing();
             }
+
+            scheduleLongRunningTaskMonitor();
         } finally {
             writeLock.unlock("onFlowInitialized");
         }
     }
 
+    private void scheduleLongRunningTaskMonitor() {
+        final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
+        final long thresholdMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
+
+        longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
+    }
+
+    private long parseDurationPropertyToMillis(String propertyName, String defaultValue) {
+        long durationMillis;
+        try {
+            final String duration = nifiProperties.getProperty(propertyName);
+            durationMillis = (long) FormatUtils.getPreciseTimeDuration(duration, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            LOG.warn("Could not retrieve value for {}. This property has been set to '{}'", propertyName, defaultValue);
+            durationMillis = (long) FormatUtils.getPreciseTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+        }
+        return durationMillis;
+    }
+
     public boolean isStartAfterInitialization(final Connectable component) {
         return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component);
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 216c2cd..4ebbb31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -251,6 +251,10 @@
         <nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
         <nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
         <nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
+
+        <!-- nifi.properties: runtime monitoring properties -->
+        <nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule>
+        <nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold>
     </properties>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index ecf271e..c28e5c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -311,3 +311,7 @@ nifi.analytics.query.interval=${nifi.analytics.query.interval}
 nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
 nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
 nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
+
+# runtime monitoring properties
+nifi.monitor.long.running.task.schedule=${nifi.monitor.long.running.task.schedule}
+nifi.monitor.long.running.task.threshold=${nifi.monitor.long.running.task.threshold}