You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2021/03/16 18:37:08 UTC
[nifi] branch main updated: NIFI-8314: Generate warning for
long-running processor tasks
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f00f0ad NIFI-8314: Generate warning for long-running processor tasks
f00f0ad is described below
commit f00f0ad269c2fab8ce7b88fb6226fdf88ffce33a
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Tue Mar 16 17:56:50 2021 +0100
NIFI-8314: Generate warning for long-running processor tasks
---
.../java/org/apache/nifi/util/NiFiProperties.java | 8 +++
.../components/monitor/LongRunningTaskMonitor.java | 69 ++++++++++++++++++++++
.../org/apache/nifi/controller/FlowController.java | 25 ++++++++
.../nifi-framework/nifi-resources/pom.xml | 4 ++
.../src/main/resources/conf/nifi.properties | 4 ++
5 files changed, 110 insertions(+)
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index b33b6f7..1bdf495 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -288,6 +288,10 @@ public abstract class NiFiProperties {
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
+ // runtime monitoring properties
+ public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = "nifi.monitor.long.running.task.schedule";
+ public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = "nifi.monitor.long.running.task.threshold";
+
// defaults
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml";
@@ -371,6 +375,10 @@ public abstract class NiFiProperties {
public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared";
public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90;
+ // runtime monitoring defaults
+ public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE = "1 min";
+ public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD = "5 mins";
+
// Status repository defaults
public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_NODE_DAYS = 14;
public static final int DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_COMPONENT_DAYS = 3;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
new file mode 100644
index 0000000..eaf3526
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components.monitor;
+
+import org.apache.nifi.controller.ActiveThreadInfo;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ThreadDetails;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class LongRunningTaskMonitor implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTaskMonitor.class);
+
+ private final FlowManager flowManager;
+ private final long thresholdMillis;
+
+ public LongRunningTaskMonitor(FlowManager flowManager, long thresholdMillis) {
+ this.flowManager = flowManager;
+ this.thresholdMillis = thresholdMillis;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Checking long running processor tasks...");
+
+ int activeThreadCount = 0;
+ int longRunningThreadCount = 0;
+
+ ThreadDetails threadDetails = ThreadDetails.capture();
+
+ for (ProcessorNode processorNode : flowManager.getRootGroup().findAllProcessors()) {
+ List<ActiveThreadInfo> activeThreads = processorNode.getActiveThreads(threadDetails);
+ activeThreadCount += activeThreads.size();
+
+ for (ActiveThreadInfo activeThread : activeThreads) {
+ if (activeThread.getActiveMillis() > thresholdMillis) {
+ longRunningThreadCount++;
+
+ LOGGER.warn(String.format("Long running task detected on processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack trace:\n%s",
+ processorNode.getIdentifier(), processorNode.getComponentType(), processorNode.getName(),
+ activeThread.getThreadName(), activeThread.getActiveMillis(), activeThread.getStackTrace()));
+
+ processorNode.getLogger().warn(String.format("Long running task detected on the processor [thread name: %s; active time: %,d].",
+ activeThread.getThreadName(), activeThread.getActiveMillis()));
+ }
+ }
+ }
+
+ LOGGER.info("Active threads: {}; Long running threads: {}", activeThreadCount, longRunningThreadCount);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 16d3347..d724550 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -40,6 +40,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.components.monitor.LongRunningTaskMonitor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.TriggerValidationTask;
@@ -310,6 +311,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private final StandardFlowManager flowManager;
private final RepositoryContextFactory repositoryContextFactory;
private final RingBufferGarbageCollectionLog gcLog;
+ private final FlowEngine longRunningTaskMonitorThreadPool;
/**
* true if controller is configured to operate in a clustered environment
@@ -778,6 +780,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
loadBalanceServer = null;
loadBalanceClientThreadPool = null;
}
+
+ longRunningTaskMonitorThreadPool = new FlowEngine(1, "Long Running Task Monitor", true);
}
@Override
@@ -1092,11 +1096,32 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
for (final Connection connection : flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}
+
+ scheduleLongRunningTaskMonitor();
} finally {
writeLock.unlock("onFlowInitialized");
}
}
+ private void scheduleLongRunningTaskMonitor() {
+ final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
+ final long thresholdMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD, NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
+
+ longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private long parseDurationPropertyToMillis(String propertyName, String defaultValue) {
+ long durationMillis;
+ try {
+ final String duration = nifiProperties.getProperty(propertyName);
+ durationMillis = (long) FormatUtils.getPreciseTimeDuration(duration, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ LOG.warn("Could not retrieve value for {}. This property has been set to '{}'", propertyName, defaultValue);
+ durationMillis = (long) FormatUtils.getPreciseTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+ }
+ return durationMillis;
+ }
+
public boolean isStartAfterInitialization(final Connectable component) {
return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 216c2cd..4ebbb31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -251,6 +251,10 @@
<nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
+
+ <!-- nifi.properties: runtime monitoring properties -->
+ <nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule>
+ <nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold>
</properties>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index ecf271e..c28e5c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -311,3 +311,7 @@ nifi.analytics.query.interval=${nifi.analytics.query.interval}
nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
+
+# runtime monitoring properties
+nifi.monitor.long.running.task.schedule=${nifi.monitor.long.running.task.schedule}
+nifi.monitor.long.running.task.threshold=${nifi.monitor.long.running.task.threshold}