You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/13 14:46:14 UTC
[nifi] branch master updated: NIFI-6658: Implement new bin/nifi.sh
diagnostics command that is responsible for obtaining diagnostic
information about many different parts of nifi, the operating system, etc.
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new eb6085a NIFI-6658: Implement new bin/nifi.sh diagnostics command that is responsible for obtaining diagnostic information about many different parts of nifi, the operating system, etc.
eb6085a is described below
commit eb6085a31d9b8e3f041ff59fc3dcbb2a110b3f95
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Sep 12 09:29:29 2019 -0400
NIFI-6658: Implement new bin/nifi.sh diagnostics command that is responsible for obtaining diagnostic information about many different parts of nifi, the operating system, etc.
This closes #3727.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 119 ++++++++----
.../nifi/util/timebuffer/CountSumMinMaxAccess.java | 41 ++++
.../timebuffer/TimestampedLongAggregation.java | 93 +++++++++
.../nifi/wali/SequentialAccessWriteAheadLog.java | 4 +
.../controller/repository/ContentRepository.java | 27 ++-
.../controller/repository/FlowFileRepository.java | 17 ++
.../controller/repository/FlowFileSwapManager.java | 11 +-
.../repository/ResourceClaimReference.java | 21 +-
.../repository/claim/ResourceClaimManager.java | 7 +
.../apache/nifi/diagnostics/DiagnosticTask.java | 21 +-
.../apache/nifi/diagnostics/DiagnosticsDump.java | 22 +--
.../nifi/diagnostics/DiagnosticsDumpElement.java | 21 +-
.../nifi/diagnostics/DiagnosticsFactory.java | 21 +-
.../nifi/diagnostics/StandardDiagnosticsDump.java | 67 +++++++
.../StandardDiagnosticsDumpElement.java | 31 +--
.../apache/nifi/diagnostics/ThreadDumpTask.java | 130 +++++++++++++
.../cluster/coordination/ClusterCoordinator.java | 10 +
.../nifi/controller/FileSystemSwapManager.java | 11 +-
.../org/apache/nifi/controller/FlowController.java | 8 +-
.../election/CuratorLeaderElectionManager.java | 92 ++++++++-
.../leader/election/LeaderElectionManager.java | 40 ++++
.../election/StandaloneLeaderElectionManager.java | 29 +++
.../repository/FileSystemRepository.java | 103 ++++++++++
.../repository/WriteAheadFlowFileRepository.java | 154 +++++++++++++++
.../scheduling/RepositoryContextFactory.java | 22 ++-
.../bootstrap/BootstrapDiagnosticsFactory.java | 96 +++++++++
.../bootstrap/tasks/ClusterDiagnosticTask.java | 89 +++++++++
.../bootstrap/tasks/ComponentCountTask.java | 105 ++++++++++
.../bootstrap/tasks/ContentRepositoryScanTask.java | 92 +++++++++
.../bootstrap/tasks/DiagnosticAnalysisTask.java | 214 +++++++++++++++++++++
.../tasks/FlowConfigurationDiagnosticTask.java | 68 +++++++
.../tasks/GarbageCollectionDiagnosticTask.java | 41 ++++
.../bootstrap/tasks/JVMDiagnosticTask.java | 69 +++++++
.../bootstrap/tasks/LongRunningProcessorTask.java | 65 +++++++
.../bootstrap/tasks/MemoryPoolPeakUsageTask.java | 54 ++++++
.../bootstrap/tasks/NarsDiagnosticTask.java | 47 +++++
.../tasks/NiFiPropertiesDiagnosticTask.java | 79 ++++++++
.../tasks/OperatingSystemDiagnosticTask.java | 79 ++++++++
.../bootstrap/tasks/RepositoryDiagnosticTask.java | 100 ++++++++++
.../src/main/resources/nifi-context.xml | 9 +-
.../apache/nifi/controller/MockSwapManager.java | 5 +
.../nifi/controller/TestFileSystemSwapManager.java | 5 +
.../repository/TestRocksDBFlowFileRepository.java | 5 +
.../TestWriteAheadFlowFileRepository.java | 5 +
.../claim/StandardResourceClaimManager.java | 16 +-
.../nifi-resources/src/main/resources/bin/nifi.sh | 4 +-
.../java/org/apache/nifi/BootstrapListener.java | 150 +++------------
.../src/main/java/org/apache/nifi/NiFi.java | 4 +
.../src/main/java/org/apache/nifi/NiFiServer.java | 5 +
.../org/apache/nifi/web/server/JettyServer.java | 99 +++++++---
50 files changed, 2318 insertions(+), 309 deletions(-)
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index 78e7019..5b1334c 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.bootstrap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bootstrap.notification.NotificationType;
+import org.apache.nifi.bootstrap.util.OSUtils;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -60,12 +67,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.bootstrap.notification.NotificationType;
-import org.apache.nifi.bootstrap.util.OSUtils;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* <p>
@@ -113,6 +114,7 @@ public class RunNiFi {
public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
+ public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
@@ -137,7 +139,7 @@ public class RunNiFi {
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
private final NotificationServiceManager serviceManager;
- public RunNiFi(final File bootstrapConfigFile, final boolean verbose) throws IOException {
+ public RunNiFi(final File bootstrapConfigFile) throws IOException {
this.bootstrapConfigFile = bootstrapConfigFile;
loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@@ -156,7 +158,7 @@ public class RunNiFi {
private static void printUsage() {
System.out.println("Usage:");
System.out.println();
- System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command> [options]");
+ System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command> [options]");
System.out.println();
System.out.println("Valid commands include:");
System.out.println("");
@@ -165,6 +167,8 @@ public class RunNiFi {
System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
+ System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
+ "the filename, which may result in additional diagnostic information being written.");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println();
}
@@ -181,18 +185,30 @@ public class RunNiFi {
File dumpFile = null;
boolean verbose = false;
- if (args[0].equals("-verbose")) {
- verbose = true;
- args = shift(args);
- }
final String cmd = args[0];
- if (cmd.equals("dump")) {
+ if (cmd.equalsIgnoreCase("dump")) {
if (args.length > 1) {
dumpFile = new File(args[1]);
} else {
dumpFile = null;
}
+ } else if (cmd.equalsIgnoreCase("diagnostics")) {
+ if (args.length > 2) {
+ verbose = args[1].equalsIgnoreCase("--verbose");
+ dumpFile = new File(args[2]);
+ } else if (args.length > 1) {
+ if (args[1].equalsIgnoreCase("--verbose")) {
+ verbose = true;
+ dumpFile = null;
+ } else {
+ verbose = false;
+ dumpFile = new File(args[1]);
+ }
+ } else {
+ dumpFile = null;
+ verbose = false;
+ }
}
switch (cmd.toLowerCase()) {
@@ -201,6 +217,7 @@ public class RunNiFi {
case "stop":
case "status":
case "dump":
+ case "diagnostics":
case "restart":
case "env":
break;
@@ -210,7 +227,7 @@ public class RunNiFi {
}
final File configFile = getDefaultBootstrapConfFile();
- final RunNiFi runNiFi = new RunNiFi(configFile, verbose);
+ final RunNiFi runNiFi = new RunNiFi(configFile);
Integer exitStatus = null;
switch (cmd.toLowerCase()) {
@@ -233,6 +250,8 @@ public class RunNiFi {
case "dump":
runNiFi.dump(dumpFile);
break;
+ case "diagnostics":
+ runNiFi.diagnostics(dumpFile, verbose);
case "env":
runNiFi.env();
break;
@@ -672,6 +691,14 @@ public class RunNiFi {
}
/**
+ * Writes NiFi diagnostic information to the given file; if the file is null, logs at INFO level instead.
+ */
+ public void diagnostics(final File dumpFile, final boolean verbose) throws IOException {
+ final String args = verbose ? "--verbose=true" : null;
+ makeRequest(DIAGNOSTICS_CMD, args, dumpFile, "diagnostics information");
+ }
+
+ /**
* Writes a NiFi thread dump to the given file; if file is null, logs at
* INFO level instead.
*
@@ -679,6 +706,10 @@ public class RunNiFi {
* @throws IOException if any issues occur while writing the dump file
*/
public void dump(final File dumpFile) throws IOException {
+ makeRequest(DUMP_CMD, null, dumpFile, "thread dump");
+ }
+
+ private void makeRequest(final String request, final String arguments, final File dumpFile, final String contentsDescription) throws IOException {
final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) {
@@ -689,40 +720,48 @@ public class RunNiFi {
final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key");
- final StringBuilder sb = new StringBuilder();
- try (final Socket socket = new Socket()) {
- logger.debug("Connecting to NiFi instance");
- socket.setSoTimeout(60000);
- socket.connect(new InetSocketAddress("localhost", port));
- logger.debug("Established connection to NiFi instance.");
- socket.setSoTimeout(60000);
+ final OutputStream fileOut = dumpFile == null ? null : new FileOutputStream(dumpFile);
+ try {
+ try (final Socket socket = new Socket()) {
+ logger.debug("Connecting to NiFi instance");
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", port));
+ logger.debug("Established connection to NiFi instance.");
+ socket.setSoTimeout(60000);
+
+ logger.debug("Sending DUMP Command to port {}", port);
+ final OutputStream socketOut = socket.getOutputStream();
+
+ if (arguments == null) {
+ socketOut.write((request + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+ } else {
+ socketOut.write((request + " " + secretKey + " " + arguments + "\n").getBytes(StandardCharsets.UTF_8));
+ }
- logger.debug("Sending DUMP Command to port {}", port);
- final OutputStream out = socket.getOutputStream();
- out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
- out.flush();
+ socketOut.flush();
- final InputStream in = socket.getInputStream();
- try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
- String line;
- while ((line = reader.readLine()) != null) {
- sb.append(line).append("\n");
+ final InputStream in = socket.getInputStream();
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (fileOut == null) {
+ logger.info(line);
+ } else {
+ fileOut.write(line.getBytes(StandardCharsets.UTF_8));
+ fileOut.write('\n');
+ }
+ }
}
}
- }
-
- final String dump = sb.toString();
- if (dumpFile == null) {
- logger.info(dump);
- } else {
- try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
- fos.write(dump.getBytes(StandardCharsets.UTF_8));
+ } finally {
+ if (fileOut != null) {
+ fileOut.close();
+ cmdLogger.info("Successfully wrote {} to {}", contentsDescription, dumpFile.getAbsolutePath());
}
- // we want to log to the console (by default) that we wrote the thread dump to the specified file
- cmdLogger.info("Successfully wrote thread dump to {}", dumpFile.getAbsolutePath());
}
}
+
public void notifyStop() {
final String hostname = getHostname();
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSumMinMaxAccess.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSumMinMaxAccess.java
new file mode 100644
index 0000000..5b15046
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSumMinMaxAccess.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class CountSumMinMaxAccess implements EntityAccess<TimestampedLongAggregation> {
+ @Override
+ public TimestampedLongAggregation aggregate(final TimestampedLongAggregation oldValue, final TimestampedLongAggregation toAdd) {
+ if (oldValue == null) {
+ return toAdd;
+ }
+ if (toAdd == null) {
+ return oldValue;
+ }
+
+ return TimestampedLongAggregation.newAggregation(oldValue.getAggregation().add(toAdd.getAggregation()));
+ }
+
+ @Override
+ public TimestampedLongAggregation createNew() {
+ return TimestampedLongAggregation.newValue(0L);
+ }
+
+ @Override
+ public long getTimestamp(final TimestampedLongAggregation entity) {
+ return entity == null ? 0L : entity.getTimestamp();
+ }
+}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLongAggregation.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLongAggregation.java
new file mode 100644
index 0000000..c82815e
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLongAggregation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class TimestampedLongAggregation {
+ private final Long value;
+ private final long timestamp;
+ private final TimestampedAggregation cumulation;
+
+ public TimestampedLongAggregation(final long value) {
+ this.value = value;
+ this.timestamp = System.currentTimeMillis();
+ this.cumulation = null;
+ }
+
+ public TimestampedLongAggregation(final TimestampedAggregation cumulation) {
+ this.value = null;
+ this.timestamp = System.currentTimeMillis();
+ this.cumulation = cumulation;
+ }
+
+ public TimestampedAggregation getAggregation() {
+ if (cumulation != null) {
+ return cumulation;
+ }
+
+ return new TimestampedAggregation(value, value, value, 1L);
+ }
+
+ public Long getValue() {
+ return value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public static TimestampedLongAggregation newValue(final long value) {
+ return new TimestampedLongAggregation(value);
+ }
+
+ public static TimestampedLongAggregation newAggregation(final TimestampedAggregation cumulation) {
+ return new TimestampedLongAggregation(cumulation);
+ }
+
+ public static class TimestampedAggregation {
+ private final long min;
+ private final long max;
+ private final long sum;
+ private final long count;
+
+ public TimestampedAggregation(final long min, final long max, final long sum, final long count) {
+ this.min = min;
+ this.max = max;
+ this.sum = sum;
+ this.count = count;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public long getSum() {
+ return sum;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public TimestampedAggregation add(final TimestampedAggregation aggregation) {
+ return new TimestampedAggregation(Math.min(min, aggregation.min), Math.max(max, aggregation.max), sum + aggregation.sum, count + aggregation.count);
+ }
+ }
+}
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index 240a212..c54f505 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -246,6 +246,10 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
return Collections.unmodifiableSet(this.recoveredSwapLocations);
}
+ public SnapshotCapture<T> captureSnapshot() {
+ return snapshot.prepareSnapshot(nextTransactionId - 1);
+ }
+
@Override
public int checkpoint() throws IOException {
return checkpoint(null);
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index d749482..7636966 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -23,9 +27,6 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.Set;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
/**
* Defines the capabilities of a content repository. Append options are not
* available on the methods but a merge capability is provided which between
@@ -283,4 +284,24 @@ public interface ContentRepository {
* @throws IOException if unable to determine accessibility
*/
boolean isAccessible(ContentClaim contentClaim) throws IOException;
+
+ /**
+ * Optional operation that returns a List of all Resource Claims that exist in the given Container that are considered "active" (i.e., not archived)
+ * @param containerName the name of the container
+ * @return a List of all Resource Claims that exist in the given Container
+ * @throws IOException if unable to obtain a listing due to an IO failure
+ * @throws UnsupportedOperationException if this repository does not implement this capability.
+ * @see #isActiveResourceClaimsSupported()
+ */
+ default Set<ResourceClaim> getActiveResourceClaims(String containerName) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Indicates whether or not the repository supports obtaining a list of active Resource Claims via the {@link #getActiveResourceClaims(String)} method
+ * @return <code>true</code> if the operation is supported, <code>false</code> otherwise
+ */
+ default boolean isActiveResourceClaimsSupported() {
+ return false;
+ }
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index ac6e68c..1da1107 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -17,12 +17,15 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Implementations must be thread safe
@@ -153,4 +156,18 @@ public interface FlowFileRepository extends Closeable {
* @return <code>true</code> if the swap location is known and valid, <code>false</code> otherwise
*/
boolean isValidSwapLocationSuffix(String swapLocationSuffix);
+
+ /**
+ * <p>
+ * Scans the FlowFile Repository to locate any FlowFiles that reference the given Resource Claims. If the FlowFile Repository does not implement this capability, it will return <code>null</code>.
+ * </p>
+ *
+ * @param resourceClaims the resource claims whose references should be found
+ * @param swapManager the swap manager to use for scanning swap files
+ * @return a Mapping of Resource Claim to a representation of the FlowFiles/Swap Files that reference those Resource Claims
+ * @throws IOException if an IO failure occurs when attempting to find references
+ */
+ default Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(Set<ResourceClaim> resourceClaims, FlowFileSwapManager swapManager) throws IOException {
+ return null;
+ }
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 8d9b38f..018f274 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,12 +16,12 @@
*/
package org.apache.nifi.controller.repository;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+
import java.io.IOException;
import java.util.List;
import java.util.Set;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-
/**
* Defines a mechanism by which FlowFiles can be move into external storage or
* memory so that they can be removed from the Java heap and vice-versa
@@ -125,4 +125,11 @@ public interface FlowFileSwapManager {
* Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
*/
void purge();
+
+ /**
+ * Returns the ID of the queue that the given swap file belongs to
+ * @param swapLocation the swap location
+ * @return the ID of the queue, or <code>null</code> if unknown
+ */
+ String getQueueIdentifier(String swapLocation);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ResourceClaimReference.java
similarity index 70%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ResourceClaimReference.java
index edb8f45..e578557 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ResourceClaimReference.java
@@ -14,23 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.controller.repository;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
+public interface ResourceClaimReference {
+ String getQueueIdentifier();
-import java.util.Set;
+ boolean isSwappedOut();
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
-
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
+ String getFlowFileUuid();
- void stop();
+ String getSwapLocation();
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index a85ddc4..b74e33c 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -151,4 +151,11 @@ public interface ResourceClaimManager {
* @param claim the resource claim to freeze
*/
void freeze(ResourceClaim claim);
+
+ /**
+ * Indicates whether or not the given Resource Claim is awaiting destruction
+ * @param claim the resource claim
+ * @return <code>true</code> if the Resource Claim is awaiting destruction, <code>false</code> otherwise
+ */
+ boolean isDestructable(ResourceClaim claim);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticTask.java
similarity index 70%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticTask.java
index edb8f45..65f541b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticTask.java
@@ -14,23 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.diagnostics;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
-
-import java.util.Set;
-
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
-
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
-
- void stop();
+public interface DiagnosticTask {
+ DiagnosticsDumpElement captureDump(boolean verbose);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDump.java
similarity index 70%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDump.java
index edb8f45..fb60305 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDump.java
@@ -14,23 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.diagnostics;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
+import java.io.IOException;
+import java.io.OutputStream;
-import java.util.Set;
-
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
-
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
-
- void stop();
+public interface DiagnosticsDump {
+ void writeTo(OutputStream out) throws IOException;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDumpElement.java
similarity index 70%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDumpElement.java
index edb8f45..66c2894 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsDumpElement.java
@@ -14,23 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.diagnostics;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
+import java.util.List;
-import java.util.Set;
+public interface DiagnosticsDumpElement {
+ String getName();
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
-
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
-
- void stop();
+ List<String> getDetails();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsFactory.java
similarity index 70%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsFactory.java
index edb8f45..8e92690 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/DiagnosticsFactory.java
@@ -14,23 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.diagnostics;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
-
-import java.util.Set;
-
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
-
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
-
- void stop();
+public interface DiagnosticsFactory {
+ DiagnosticsDump create(boolean verbose);
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDump.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDump.java
new file mode 100644
index 0000000..d1dd982
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDump.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Date;
+import java.util.List;
+
+public class StandardDiagnosticsDump implements DiagnosticsDump {
+ private final List<DiagnosticsDumpElement> dumpElements;
+ private final long timestamp;
+
+ public StandardDiagnosticsDump(final List<DiagnosticsDumpElement> dumpElements, final long timestamp) {
+ this.dumpElements = dumpElements;
+ this.timestamp = timestamp;
+ }
+
+ public void writeTo(final OutputStream out) throws IOException {
+ final Writer outputStreamWriter = new OutputStreamWriter(out);
+ final BufferedWriter writer = new BufferedWriter(outputStreamWriter);
+
+ writer.write("Diagnostic Dump taken at ");
+ final Date date = new Date(timestamp);
+ writer.write(date.toString());
+ writer.write("\n\n");
+
+ for (final DiagnosticsDumpElement element : dumpElements) {
+ writeHeader(writer, element.getName());
+
+ for (final String line : element.getDetails()) {
+ writer.write(line);
+ writer.write("\n");
+ }
+
+ writer.write("\n\n");
+ }
+
+ writer.flush();
+ }
+
+ private void writeHeader(final BufferedWriter writer, final String header) throws IOException {
+ writer.write(header);
+ writer.write("\n");
+ for (int i=0; i < header.length(); i++) {
+ writer.write("-");
+ }
+ writer.write("\n");
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDumpElement.java
similarity index 60%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDumpElement.java
index edb8f45..306af86 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/StandardDiagnosticsDumpElement.java
@@ -14,23 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.diagnostics;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.nar.ExtensionMapping;
+import java.util.List;
-import java.util.Set;
+public class StandardDiagnosticsDumpElement implements DiagnosticsDumpElement {
+ private final String name;
+ private final List<String> details;
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void setExtensionMapping(ExtensionMapping extensionMapping);
+ public StandardDiagnosticsDumpElement(final String name, final List<String> details) {
+ this.name = name;
+ this.details = details;
+ }
- void setBundles(Bundle systemBundle, Set<Bundle> bundles);
+ @Override
+ public String getName() {
+ return name;
+ }
- void stop();
+ @Override
+ public List<String> getDetails() {
+ return details;
+ }
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java
new file mode 100644
index 0000000..d5765fd
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class ThreadDumpTask implements DiagnosticTask {
+ @Override
+ public DiagnosticsDumpElement captureDump(boolean verbose) {
+ final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+
+ final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
+ final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
+ final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
+
+ final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
+ Collections.addAll(sortedInfos, infos);
+ sortedInfos.sort(new Comparator<ThreadInfo>() {
+ @Override
+ public int compare(ThreadInfo o1, ThreadInfo o2) {
+ return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
+ }
+ });
+
+ final StringBuilder sb = new StringBuilder();
+ for (final ThreadInfo info : sortedInfos) {
+ sb.append("\n");
+ sb.append("\"").append(info.getThreadName()).append("\" Id=");
+ sb.append(info.getThreadId()).append(" ");
+ sb.append(info.getThreadState().toString()).append(" ");
+
+ switch (info.getThreadState()) {
+ case BLOCKED:
+ case TIMED_WAITING:
+ case WAITING:
+ sb.append(" on ");
+ sb.append(info.getLockInfo());
+ break;
+ default:
+ break;
+ }
+
+ if (info.isSuspended()) {
+ sb.append(" (suspended)");
+ }
+ if (info.isInNative()) {
+ sb.append(" (in native code)");
+ }
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+ for (final long id : deadlockedThreadIds) {
+ if (id == info.getThreadId()) {
+ sb.append(" ** DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+ for (final long id : monitorDeadlockThreadIds) {
+ if (id == info.getThreadId()) {
+ sb.append(" ** MONITOR-DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ final StackTraceElement[] stackTraces = info.getStackTrace();
+ for (final StackTraceElement element : stackTraces) {
+ sb.append("\n\tat ").append(element);
+
+ final MonitorInfo[] monitors = info.getLockedMonitors();
+ for (final MonitorInfo monitor : monitors) {
+ if (monitor.getLockedStackFrame().equals(element)) {
+ sb.append("\n\t- waiting on ").append(monitor);
+ }
+ }
+ }
+
+ final LockInfo[] lockInfos = info.getLockedSynchronizers();
+ if (lockInfos.length > 0) {
+ sb.append("\n\t");
+ sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
+ for (final LockInfo lockInfo : lockInfos) {
+ sb.append("\n\t- ").append(lockInfo.toString());
+ }
+ }
+
+ sb.append("\n");
+ }
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+ sb.append("\n\nDEADLOCK DETECTED!");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for (final long id : deadlockedThreadIds) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+ sb.append("\n\nMONITOR DEADLOCK DETECTED!");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for (final long id : monitorDeadlockThreadIds) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(sb.toString()));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index fad51d1..7f5a8f1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -281,4 +281,14 @@ public interface ClusterCoordinator {
* @param eventListener the event listener to stop notifying
*/
void unregisterEventListener(ClusterTopologyEventListener eventListener);
+
+ default String summarizeClusterState() {
+ final StringBuilder sb = new StringBuilder();
+ for (final NodeIdentifier nodeId : getNodeIdentifiers()) {
+ sb.append(nodeId.getFullDescription()).append(" : ").append(getConnectionStatus(nodeId));
+ sb.append("\n");
+ }
+
+ return sb.toString();
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index d41ded5..01cd272 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -210,8 +211,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
- private String getOwnerQueueIdentifier(final File swapFile) {
- final String[] splits = swapFile.getName().split("-");
+ @Override
+ public String getQueueIdentifier(final String swapLocation) {
+ final String filename = swapLocation.contains("/") ? StringUtils.substringAfterLast(swapLocation, "/") : swapLocation;
+ final String[] splits = filename.split("-");
if (splits.length > 6) {
final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
return queueIdentifier;
@@ -220,6 +223,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
return null;
}
+ private String getOwnerQueueIdentifier(final File swapFile) {
+ return getQueueIdentifier(swapFile.getName());
+ }
+
private String getOwnerPartition(final File swapFile) {
final String filename = swapFile.getName();
final int indexOfDot = filename.indexOf(".");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index c6f942b..a5c1981 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1794,6 +1794,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return repositoryContextFactory;
}
+ public ClusterCoordinator getClusterCoordinator() {
+ return clusterCoordinator;
+ }
+
/**
* Creates a connection between two Connectable objects.
*
@@ -2299,7 +2303,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}
-
+ public LeaderElectionManager getLeaderElectionManager() {
+ return leaderElectionManager;
+ }
/**
* @return true if this instance is the primary node in the cluster; false
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index d51c79f..d07c776 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -29,6 +29,11 @@ import org.apache.curator.retry.RetryNTimes;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.timebuffer.CountSumMinMaxAccess;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
+import org.apache.nifi.util.timebuffer.TimestampedLongAggregation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.common.PathUtils;
import org.slf4j.Logger;
@@ -36,6 +41,9 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
public class CuratorLeaderElectionManager implements LeaderElectionManager {
@@ -51,6 +59,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
private final Map<String, RegisteredRole> registeredRoles = new HashMap<>();
+ private final Map<String, TimedBuffer<TimestampedLong>> leaderChanges = new HashMap<>();
+ private final TimedBuffer<TimestampedLongAggregation> pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess());
+ private final ConcurrentMap<String, String> lastKnownLeader = new ConcurrentHashMap<>();
+
public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
zkConfig = ZooKeeperClientConfig.createConfig(properties);
@@ -192,6 +204,26 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return leaderRoles.get(roleName);
}
+ private synchronized void onLeaderChanged(final String roleName) {
+ final TimedBuffer<TimestampedLong> buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess()));
+ buffer.add(new TimestampedLong(1L));
+ }
+
+ public synchronized Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit unit) {
+ final Map<String, Integer> leadershipChangesPerRole = new HashMap<>();
+
+ for (final Map.Entry<String, TimedBuffer<TimestampedLong>> entry : leaderChanges.entrySet()) {
+ final String roleName = entry.getKey();
+ final TimedBuffer<TimestampedLong> buffer = entry.getValue();
+
+ final TimestampedLong aggregateValue = buffer.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(duration, unit));
+ final int leadershipChanges = aggregateValue.getValue().intValue();
+ leadershipChangesPerRole.put(roleName, leadershipChanges);
+ }
+
+ return leadershipChangesPerRole;
+ }
+
@Override
public boolean isLeader(final String roleName) {
final LeaderRole role = getLeaderRole(roleName);
@@ -213,6 +245,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return determineLeaderExternal(roleName);
}
+ final long startNanos = System.nanoTime();
Participant participant;
try {
participant = role.getLeaderSelector().getLeader();
@@ -230,9 +263,59 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return null;
}
+ registerPollTime(System.nanoTime() - startNanos);
+
+ final String previousLeader = lastKnownLeader.put(roleName, participantId);
+ if (previousLeader != null && !previousLeader.equals(participantId)) {
+ onLeaderChanged(roleName);
+ }
+
return participantId;
}
+ private synchronized void registerPollTime(final long nanos) {
+ pollTimes.add(TimestampedLongAggregation.newValue(nanos));
+ }
+
+ public synchronized long getAveragePollTime(final TimeUnit timeUnit) {
+ final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
+ if (aggregation == null || aggregation.getCount() == 0) {
+ return 0L;
+ }
+
+ final long averageNanos = aggregation.getSum() / aggregation.getCount();
+ return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public synchronized long getMinPollTime(final TimeUnit timeUnit) {
+ final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
+ if (aggregation == null) {
+ return 0L;
+ }
+
+ final long minNanos = aggregation.getMin();
+ return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public synchronized long getMaxPollTime(final TimeUnit timeUnit) {
+ final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
+ if (aggregation == null) {
+ return 0L;
+ }
+
+ final long minNanos = aggregation.getMin();
+ return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public synchronized long getPollCount() {
+ final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
+ if (aggregation == null) {
+ return 0L;
+ }
+
+ return aggregation.getCount();
+ }
/**
* Determines whether or not leader election has already begun for the role with the given name
@@ -255,8 +338,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
* the leader from ZooKeeper
*/
private String determineLeaderExternal(final String roleName) {
- final CuratorFramework client = createClient();
- try {
+ final long start = System.nanoTime();
+
+ try (CuratorFramework client = createClient()) {
final LeaderSelectorListener electionListener = new LeaderSelectorListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
@@ -288,7 +372,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return null;
}
} finally {
- client.close();
+ registerPollTime(System.nanoTime() - start);
}
}
@@ -378,7 +462,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.info("{} Connection State changed to {}", this, newState.name());
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
- if (leader == true) {
+ if (leader) {
logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
index d9d4e71..2daf9ef 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
@@ -17,6 +17,9 @@
package org.apache.nifi.controller.leader.election;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
public interface LeaderElectionManager {
/**
* Starts managing leader elections for all registered roles
@@ -94,4 +97,41 @@ public interface LeaderElectionManager {
* @return <code>true</code> if a leader has been elected, <code>false</code> otherwise.
*/
boolean isLeaderElected(String roleName);
+
+ /**
+ * Returns a Map of Role Name to the number of times that the leader has been detected as changing in the given time period. Note that
+ * the amount of time that these counts is stored and the precision is implementation specific.
+ *
+ * @param duration the duration
+ * @param timeUnit the time unit
+ * @return a Mapping of role to the number of times that the leader for that role has changed
+ */
+ Map<String, Integer> getLeadershipChangeCount(long duration, TimeUnit timeUnit);
+
+ /**
+ * Returns the average amount of time it has taken to poll the leader election service in the past 5 minutes.
+ * @param timeUnit the desired time unit
+ * @return the average amount of time it has taken to poll the leader election service, or <code>-1</code> if this is not supported
+ */
+ long getAveragePollTime(TimeUnit timeUnit);
+
+ /**
+ * Returns the minimum amount of time any poll of the leader election service has taken in the past 5 minutes.
+ * @param timeUnit the desired time unit
+ * @return the minimum amount of time any poll of the leader election service has taken, or <code>-1</code> if this is not supported
+ */
+ long getMinPollTime(TimeUnit timeUnit);
+
+ /**
+ * Returns the maximum amount of time any poll of the leader election service has taken in the past 5 minutes.
+ * @param timeUnit the desired time unit
+ * @return the maximum amount of time any poll of the leader election service has taken, or <code>-1</code> if this is not supported
+ */
+ long getMaxPollTime(TimeUnit timeUnit);
+
+ /**
+ * Returns the number of times that the leader election service has been polled in the past 5 minutes
+ * @return the number of times that the leader election service has been polled in the past 5 minutes, or <code>-1</code> if this is not supported
+ */
+ long getPollCount();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
index 182e83a..2c5266b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
@@ -17,6 +17,10 @@
package org.apache.nifi.controller.leader.election;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
/**
* <p>
* A LeaderElectionManager to use when running a standalone (un-clustered) NiFi instance
@@ -60,6 +64,31 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager {
}
@Override
+ public Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit timeUnit) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public long getAveragePollTime(final TimeUnit timeUnit) {
+ return -1L;
+ }
+
+ @Override
+ public long getMinPollTime(final TimeUnit timeUnit) {
+ return -1L;
+ }
+
+ @Override
+ public long getMaxPollTime(final TimeUnit timeUnit) {
+ return -1L;
+ }
+
+ @Override
+ public long getPollCount() {
+ return -1L;
+ }
+
+ @Override
public boolean isLeaderElected(String roleName) {
return false;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index be7f17d..5bd4c3c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -512,6 +512,32 @@ public class FileSystemRepository implements ContentRepository {
}
}
+ @Override
+ public boolean isActiveResourceClaimsSupported() {
+ return true;
+ }
+
+ @Override
+ public Set<ResourceClaim> getActiveResourceClaims(final String containerName) throws IOException {
+ final Path containerPath = containers.get(containerName);
+ if (containerPath == null) {
+ return Collections.emptySet();
+ }
+
+ final ScanForActiveResourceClaims scan = new ScanForActiveResourceClaims(containerPath, containerName, resourceClaimManager, containers.keySet());
+ Files.walkFileTree(containerPath, scan);
+
+ final Set<ResourceClaim> activeResourceClaims = scan.getActiveResourceClaims();
+
+ LOG.debug("Obtaining active resource claims, will return a list of {} resource claims for container {}", activeResourceClaims.size(), containerName);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Listing of resource claims:");
+ activeResourceClaims.forEach(claim -> LOG.trace(claim.toString()));
+ }
+
+ return activeResourceClaims;
+ }
+
private Path getPath(final ContentClaim claim) {
final ResourceClaim resourceClaim = claim.getResourceClaim();
return getPath(resourceClaim);
@@ -1789,4 +1815,81 @@ public class FileSystemRepository implements ContentRepository {
}
return cleanupInterval;
}
+
+
+ private static class ScanForActiveResourceClaims extends SimpleFileVisitor<Path> {
+ private static final Pattern SECTION_NAME_PATTERN = Pattern.compile("\\d{0,4}");
+ private final String containerName;
+ private final ResourceClaimManager resourceClaimManager;
+ private final Set<String> containerNames;
+ private final Path rootPath;
+
+ private final Set<ResourceClaim> activeResourceClaims = new HashSet<>();
+ private String sectionName = null;
+
+ public ScanForActiveResourceClaims(final Path rootPath, final String containerName, final ResourceClaimManager resourceClaimManager, final Set<String> containerNames) {
+ this.rootPath = rootPath;
+ this.containerName = containerName;
+ this.resourceClaimManager = resourceClaimManager;
+ this.containerNames = containerNames;
+ }
+
+ public Set<ResourceClaim> getActiveResourceClaims() {
+ return activeResourceClaims;
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ LOG.warn("Content repository contains un-readable file or directory '" + file.getFileName() + "'. Skipping. ", exc);
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attrs) throws IOException {
+ if (dir.equals(rootPath)) {
+ return FileVisitResult.CONTINUE;
+ }
+
+ // Check if this is an 'archive' directory
+ final String dirName = dir.toFile().getName();
+
+ if (containerNames.contains(dirName)) {
+ LOG.debug("Obtaining active resource claims, will traverse into Container {}", dirName);
+ return FileVisitResult.CONTINUE;
+ }
+
+ if (SECTION_NAME_PATTERN.matcher(dirName).matches()) {
+ LOG.debug("Obtaining active resource claims, will traverse into Section {}", dirName);
+ sectionName = dirName;
+ return FileVisitResult.CONTINUE;
+ } else {
+ LOG.debug("Obtaining active resource claims, will NOT traverse into sub-directory {}", dirName);
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+ }
+
+ @Override
+ public FileVisitResult visitFile(final Path path, final BasicFileAttributes attrs) throws IOException {
+ if (attrs.isDirectory()) {
+ return FileVisitResult.CONTINUE;
+ }
+
+ final File file = path.toFile();
+ if (sectionName == null || !sectionName.equals(file.getParentFile().getName())) {
+ LOG.debug("Obtaining active resource claims, will NOT consider {} because its parent is not the current section", file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ final String identifier = file.getName();
+ ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(containerName, sectionName, identifier);
+ if (resourceClaim == null) {
+ resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, identifier, false, false);
+ }
+
+ activeResourceClaims.add(resourceClaim);
+
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 779c29c..2ba59c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -21,9 +21,11 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.apache.nifi.wali.SnapshotCapture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
@@ -230,6 +232,158 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
@Override
+ public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(final Set<ResourceClaim> resourceClaims, final FlowFileSwapManager swapManager) throws IOException {
+ if (!(wal instanceof SequentialAccessWriteAheadLog)) {
+ return null;
+ }
+
+ final Map<ResourceClaim, Set<ResourceClaimReference>> references = new HashMap<>();
+
+ final SnapshotCapture<RepositoryRecord> snapshot = ((SequentialAccessWriteAheadLog<RepositoryRecord>) wal).captureSnapshot();
+ for (final RepositoryRecord repositoryRecord : snapshot.getRecords().values()) {
+ final ContentClaim contentClaim = repositoryRecord.getCurrentClaim();
+ if (contentClaim == null) {
+ continue;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ if (resourceClaims.contains(resourceClaim)) {
+ final Set<ResourceClaimReference> claimReferences = references.computeIfAbsent(resourceClaim, key -> new HashSet<>());
+ claimReferences.add(createResourceClaimReference(repositoryRecord));
+ }
+ }
+
+
+ for (final String swapLocation : snapshot.getSwapLocations()) {
+ final String queueIdentifier = swapManager.getQueueIdentifier(swapLocation);
+ final ResourceClaimReference swapReference = createResourceClaimReference(swapLocation, queueIdentifier);
+
+ try {
+ final SwapSummary swapSummary = swapManager.getSwapSummary(swapLocation);
+
+ for (final ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
+ if (resourceClaims.contains(resourceClaim)) {
+ final Set<ResourceClaimReference> claimReferences = references.computeIfAbsent(resourceClaim, key -> new HashSet<>());
+ claimReferences.add(swapReference);
+ }
+ }
+ } catch (final Exception e) {
+ logger.warn("Failed to read swap file " + swapLocation + " when attempting to find resource claim references", e);
+ }
+ }
+
+ return references;
+ }
+
+ private ResourceClaimReference createResourceClaimReference(final String swapLocation, final String queueIdentifier) {
+ return new ResourceClaimReference() {
+ @Override
+ public String getQueueIdentifier() {
+ return queueIdentifier;
+ }
+
+ @Override
+ public boolean isSwappedOut() {
+ return true;
+ }
+
+ @Override
+ public String getFlowFileUuid() {
+ return null;
+ }
+
+ @Override
+ public String getSwapLocation() {
+ return swapLocation;
+ }
+
+ @Override
+ public String toString() {
+ return "Swap File[location=" + getSwapLocation() + ", queue=" + getQueueIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(queueIdentifier, swapLocation);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+
+ final ResourceClaimReference other = (ResourceClaimReference) obj;
+ return Objects.equals(queueIdentifier, other.getQueueIdentifier()) && Objects.equals(swapLocation, other.getSwapLocation());
+ }
+ };
+ }
+
+ private ResourceClaimReference createResourceClaimReference(final RepositoryRecord repositoryRecord) {
+ FlowFileQueue flowFileQueue = repositoryRecord.getDestination();
+ if (flowFileQueue == null) {
+ flowFileQueue = repositoryRecord.getOriginalQueue();
+ }
+
+ final String queueIdentifier = flowFileQueue == null ? null : flowFileQueue.getIdentifier();
+ final String flowFileUuid = repositoryRecord.getCurrent().getAttribute(CoreAttributes.UUID.key());
+
+ return new ResourceClaimReference() {
+ @Override
+ public String getQueueIdentifier() {
+ return queueIdentifier;
+ }
+
+ @Override
+ public boolean isSwappedOut() {
+ return false;
+ }
+
+ @Override
+ public String getFlowFileUuid() {
+ return flowFileUuid;
+ }
+
+ @Override
+ public String getSwapLocation() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "FlowFile[uuid=" + getFlowFileUuid() + ", queue=" + getQueueIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(queueIdentifier, flowFileUuid);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+
+ final ResourceClaimReference other = (ResourceClaimReference) obj;
+ return Objects.equals(queueIdentifier, other.getQueueIdentifier()) && Objects.equals(flowFileUuid, other.getFlowFileUuid());
+ }
+ };
+ }
+
+ @Override
public long getStorageCapacity() throws IOException {
long capacity = 0L;
for (final File file : flowFileRepositoryPaths) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
index 4cffeda..ff9df5f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
@@ -16,15 +16,15 @@
*/
package org.apache.nifi.controller.scheduling;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryContext;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceRepository;
+
+import java.util.concurrent.atomic.AtomicLong;
public class RepositoryContextFactory {
@@ -32,11 +32,11 @@ public class RepositoryContextFactory {
private final FlowFileRepository flowFileRepo;
private final FlowFileEventRepository flowFileEventRepo;
private final CounterRepository counterRepo;
- private final ProvenanceEventRepository provenanceRepo;
+ private final ProvenanceRepository provenanceRepo;
public RepositoryContextFactory(final ContentRepository contentRepository, final FlowFileRepository flowFileRepository,
final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository,
- final ProvenanceEventRepository provenanceRepository) {
+ final ProvenanceRepository provenanceRepository) {
this.contentRepo = contentRepository;
this.flowFileRepo = flowFileRepository;
@@ -48,4 +48,16 @@ public class RepositoryContextFactory {
public RepositoryContext newProcessContext(final Connectable connectable, final AtomicLong connectionIndex) {
return new RepositoryContext(connectable, connectionIndex, contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
}
+
+ public ContentRepository getContentRepository() {
+ return contentRepo;
+ }
+
+ public FlowFileRepository getFlowFileRepository() {
+ return flowFileRepo;
+ }
+
+ public ProvenanceRepository getProvenanceRepository() {
+ return provenanceRepo;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java
new file mode 100644
index 0000000..65b18a1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDump;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.DiagnosticsFactory;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDump;
+import org.apache.nifi.diagnostics.bootstrap.tasks.ClusterDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.ComponentCountTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.ContentRepositoryScanTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.DiagnosticAnalysisTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.FlowConfigurationDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.GarbageCollectionDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.JVMDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.LongRunningProcessorTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.MemoryPoolPeakUsageTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.NarsDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.NiFiPropertiesDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.OperatingSystemDiagnosticTask;
+import org.apache.nifi.diagnostics.bootstrap.tasks.RepositoryDiagnosticTask;
+import org.apache.nifi.diagnostics.ThreadDumpTask;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BootstrapDiagnosticsFactory implements DiagnosticsFactory {
+ private static final Logger logger = LoggerFactory.getLogger(BootstrapDiagnosticsFactory.class);
+
+ private FlowController flowController;
+ private NiFiProperties nifiProperties;
+
+ @Override
+ public DiagnosticsDump create(final boolean verbose) {
+ // TODO: Allow for a 'verbose' flag to indicate scanning content repo
+ final List<DiagnosticsDumpElement> dumpElements = new ArrayList<>();
+ for (final DiagnosticTask dumpTask : getDiagnosticTasks()) {
+ try {
+ final DiagnosticsDumpElement dumpElement = dumpTask.captureDump(verbose);
+ if (dumpElement != null) {
+ dumpElements.add(dumpElement);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to obtain diagnostics information from " + dumpTask.getClass(), e);
+ }
+ }
+
+ return new StandardDiagnosticsDump(dumpElements, System.currentTimeMillis());
+ }
+
+ public List<DiagnosticTask> getDiagnosticTasks() {
+ final List<DiagnosticTask> tasks = new ArrayList<>();
+ tasks.add(new DiagnosticAnalysisTask(flowController));
+ tasks.add(new JVMDiagnosticTask());
+ tasks.add(new OperatingSystemDiagnosticTask());
+ tasks.add(new NarsDiagnosticTask(flowController.getExtensionManager()));
+ tasks.add(new FlowConfigurationDiagnosticTask(flowController));
+ tasks.add(new LongRunningProcessorTask(flowController));
+ tasks.add(new ClusterDiagnosticTask(flowController));
+ tasks.add(new GarbageCollectionDiagnosticTask());
+ tasks.add(new MemoryPoolPeakUsageTask());
+ tasks.add(new RepositoryDiagnosticTask(flowController));
+ tasks.add(new ComponentCountTask(flowController));
+ tasks.add(new NiFiPropertiesDiagnosticTask(nifiProperties));
+ tasks.add(new ContentRepositoryScanTask(flowController));
+ tasks.add(new ThreadDumpTask());
+ return tasks;
+ }
+
+ public void setFlowController(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ public void setNifiProperties(final NiFiProperties nifiProperties) {
+ this.nifiProperties = nifiProperties;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ClusterDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ClusterDiagnosticTask.java
new file mode 100644
index 0000000..ecfb5de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ClusterDiagnosticTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterDiagnosticTask implements DiagnosticTask {
+ private final FlowController flowController;
+
+ public ClusterDiagnosticTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ if (!flowController.isClustered()) {
+ details.add("This instance is not clustered");
+ return new StandardDiagnosticsDumpElement("Cluster Details", details);
+ }
+
+ final ClusterCoordinator clusterCoordinator = flowController.getClusterCoordinator();
+ for (final NodeConnectionStatus status : clusterCoordinator.getConnectionStatuses()) {
+ final StringBuilder sb = new StringBuilder();
+
+ final NodeIdentifier nodeId = status.getNodeIdentifier();
+ sb.append(nodeId.getFullDescription());
+
+ final NodeConnectionState state = status.getState();
+ sb.append("; State = ").append(state);
+
+ if (state == NodeConnectionState.OFFLOADED || state == NodeConnectionState.OFFLOADING) {
+ sb.append("; Offload Code = ").append(status.getOffloadCode()).append("; Reason = ").append(status.getReason());
+ } else if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
+ sb.append("; Disconnection Code = ").append(status.getDisconnectCode()).append("; Reason = ").append(status.getReason());
+ }
+
+ details.add(sb.toString());
+ }
+
+ details.add("Primary Node : " + clusterCoordinator.getPrimaryNode());
+ details.add("Coordinator Node : " + clusterCoordinator.getElectedActiveCoordinatorNode());
+ details.add("Local Node : " + clusterCoordinator.getLocalNodeIdentifier());
+
+ final LeaderElectionManager leaderElectionManager = flowController.getLeaderElectionManager();
+ if (leaderElectionManager != null) {
+ final Map<String, Integer> changeCounts = leaderElectionManager.getLeadershipChangeCount(24, TimeUnit.HOURS);
+ changeCounts.forEach((role, count) -> details.add("Leadership for Role <" + role + "> has changed " + count + " times in the last 24 hours."));
+
+ details.add("In the past 5 minutes, the Leader Election service has been polled " + leaderElectionManager.getPollCount() + " times");
+ details.add("In the past 5 minutes, the minimum time taken to communicate with the Leader Election service has been "
+ + leaderElectionManager.getMinPollTime(TimeUnit.MILLISECONDS) + " millis");
+ details.add("In the past 5 minutes, the maximum time taken to communicate with the Leader Election service has been "
+ + leaderElectionManager.getMaxPollTime(TimeUnit.MILLISECONDS) + " millis");
+ details.add("In the past 5 minutes, the average time taken to communicate with the Leader Election service has been "
+ + leaderElectionManager.getAveragePollTime(TimeUnit.MILLISECONDS) + " millis");
+ }
+
+ return new StandardDiagnosticsDumpElement("Cluster Details", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ComponentCountTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ComponentCountTask.java
new file mode 100644
index 0000000..5e53e61
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ComponentCountTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.groups.ProcessGroup;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ComponentCountTask implements DiagnosticTask {
+ private final FlowController flowController;
+
+ public ComponentCountTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ final ProcessGroup processGroup = flowController.getFlowManager().getRootGroup();
+ details.add("Processors:");
+ countProcessors(processGroup.findAllProcessors(), details);
+
+ details.add("\n\nController Services:");
+ countControllerServices(processGroup.findAllControllerServices(), details);
+
+ return new StandardDiagnosticsDumpElement("Component Counts", details);
+ }
+
+ private void countProcessors(final Collection<ProcessorNode> processors, final List<String> details) {
+ final Map<String, Map<ScheduledState, Integer>> typeMap = new HashMap<>();
+
+ for (final ProcessorNode procNode : processors) {
+ final String componentType = procNode.getComponentType();
+
+ final ScheduledState scheduledState = procNode.getScheduledState();
+ final Map<ScheduledState, Integer> stateCounts = typeMap.computeIfAbsent(componentType, key -> new HashMap<>());
+ final Integer count = stateCounts.computeIfAbsent(scheduledState, key -> 0);
+ stateCounts.put(scheduledState, count + 1);
+ }
+
+ for (final Map.Entry<String, Map<ScheduledState, Integer>> typeEntry : typeMap.entrySet()) {
+ final String type = typeEntry.getKey();
+ final Map<ScheduledState, Integer> stateMap = typeEntry.getValue();
+
+ final int total = stateMap.values().stream().mapToInt(Integer::intValue).sum();
+ details.add(type + " : " + total + " Total, " + stateMap.toString().toLowerCase());
+ }
+
+ if (typeMap.isEmpty()) {
+ details.add("No Processors");
+ }
+ }
+
+ private void countControllerServices(final Collection<ControllerServiceNode> services, final List<String> details) {
+ final Map<String, Map<ControllerServiceState, Integer>> typeMap = new HashMap<>();
+
+ for (final ControllerServiceNode serviceNode : services) {
+ final String componentType = serviceNode.getComponentType();
+
+ final ControllerServiceState serviceState = serviceNode.getState();
+ final Map<ControllerServiceState, Integer> stateCounts = typeMap.computeIfAbsent(componentType, key -> new HashMap<>());
+ final Integer count = stateCounts.computeIfAbsent(serviceState, key -> 0);
+ stateCounts.put(serviceState, count + 1);
+ }
+
+ for (final Map.Entry<String, Map<ControllerServiceState, Integer>> typeEntry : typeMap.entrySet()) {
+ final String type = typeEntry.getKey();
+ final Map<ControllerServiceState, Integer> stateMap = typeEntry.getValue();
+
+ final int total = stateMap.values().stream().mapToInt(Integer::intValue).sum();
+ details.add(type + " : " + total + " total, " + stateMap.toString().toLowerCase());
+ }
+
+ if (typeMap.isEmpty()) {
+ details.add("No Controller Services");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
new file mode 100644
index 0000000..8493099
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.ResourceClaimReference;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ContentRepositoryScanTask implements DiagnosticTask {
+ private static final Logger logger = LoggerFactory.getLogger(ContentRepositoryScanTask.class);
+
+ private final FlowController flowController;
+
+ public ContentRepositoryScanTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ if (!verbose) {
+ // This task is very expensive, as it must scan the contents of the Content Repository. As such, it will not
+ // run at all if verbose output is disabled.
+ return null;
+ }
+
+ final ContentRepository contentRepository = flowController.getRepositoryContextFactory().getContentRepository();
+ if (!contentRepository.isActiveResourceClaimsSupported()) {
+ return new StandardDiagnosticsDumpElement("Content Repository Scan", Collections.singletonList("Current Content Repository does not support scanning for in-use content"));
+ }
+
+ final FlowFileRepository flowFileRepository = flowController.getRepositoryContextFactory().getFlowFileRepository();
+ final ResourceClaimManager resourceClaimManager = flowController.getResourceClaimManager();
+ final FlowFileSwapManager swapManager = flowController.createSwapManager();
+
+ final List<String> details = new ArrayList<>();
+
+ for (final String containerName : contentRepository.getContainerNames()) {
+ try {
+ final Set<ResourceClaim> resourceClaims = contentRepository.getActiveResourceClaims(containerName);
+
+ final Map<ResourceClaim, Set<ResourceClaimReference>> referenceMap = flowFileRepository.findResourceClaimReferences(resourceClaims, swapManager);
+
+ for (final ResourceClaim resourceClaim : resourceClaims) {
+ final int claimCount = resourceClaimManager.getClaimantCount(resourceClaim);
+ final boolean inUse = resourceClaim.isInUse();
+ final boolean destructable = resourceClaimManager.isDestructable(resourceClaim);
+
+ final Set<ResourceClaimReference> references = referenceMap == null ? Collections.emptySet() : referenceMap.getOrDefault(resourceClaim, Collections.emptySet());
+
+ final String path = resourceClaim.getContainer() + "/" + resourceClaim.getSection() + "/" + resourceClaim.getId();
+ details.add(String.format("%1$s, Claimant Count = %2$d, In Use = %3$b, Awaiting Destruction = %4$b, References (%5$d) = %6$s",
+ path, claimCount, inUse, destructable, references.size(), references.toString()));
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to obtain listing of Active Resource Claims for container {}", containerName, e);
+ details.add("Failed to obtain listing of Active Resource Claims in container " + containerName);
+ }
+ }
+
+
+ return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DiagnosticAnalysisTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DiagnosticAnalysisTask.java
new file mode 100644
index 0000000..df89e40
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DiagnosticAnalysisTask.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class DiagnosticAnalysisTask implements DiagnosticTask {
+ private static final int THREAD_TO_AVAILABLE_PROCS_RATIO = 6;
+ private static final int MAX_CONCURRENT_TASKS = 15;
+
+ private final FlowController flowController;
+
+ public DiagnosticAnalysisTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ final List<ProcessorNode> allProcessors = flowController.getFlowManager().getRootGroup().findAllProcessors();
+
+ analyzeCpuUsage(details);
+ analyzeHighTimerDrivenThreadCount(details);
+ analyzeProcessors(allProcessors, details);
+ analyzeOpenFileHandles(details);
+ analyzeTimerDrivenThreadExhaustion(details);
+ analyzeColocatedRepos(details);
+ analyzeLeadershipChanges(details);
+
+ if (details.isEmpty()) {
+ details.add("Analysis found no concerns");
+ }
+
+ return new StandardDiagnosticsDumpElement("Analysis", details);
+ }
+
+ private void analyzeCpuUsage(final List<String> details) {
+ final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+
+ final double loadAverage = os.getSystemLoadAverage();
+ final int availableProcs = os.getAvailableProcessors();
+
+ if (loadAverage > availableProcs) {
+ details.add(String.format("1-minute CPU Load Average is %1$.2f, which exceeds the %2$d available cores. CPU is over-utilized.", loadAverage, availableProcs));
+ } else if (loadAverage > 0.9 * availableProcs) {
+ details.add(String.format("1-minute CPU Load Average is %1$.2f, which exceeds 90%% of the %2$d available cores. CPU may struggle to keep up.", loadAverage, availableProcs));
+ }
+ }
+
+ private void analyzeHighTimerDrivenThreadCount(final List<String> details) {
+ final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ final int availableProcs = os.getAvailableProcessors();
+
+ if (flowController.getMaxTimerDrivenThreadCount() > THREAD_TO_AVAILABLE_PROCS_RATIO * availableProcs) {
+ details.add("Number of Timer-Driven Threads is " + flowController.getMaxTimerDrivenThreadCount() + " with " + availableProcs
+ + " available cores. Number of threads exceeds " + THREAD_TO_AVAILABLE_PROCS_RATIO + "x the number of cores available.");
+ }
+ }
+
+ private void analyzeProcessors(final Collection<ProcessorNode> processors, final List<String> details) {
+ final Map<String, Integer> highMemTypesToCounts = new HashMap<>();
+
+ for (final ProcessorNode procNode : processors) {
+ if (procNode.getMaxConcurrentTasks() > MAX_CONCURRENT_TASKS) {
+ details.add(procNode + " is configured with a Max Concurrent Tasks of " + procNode.getMaxConcurrentTasks()
+ + ", which is very high. Under most circumstances, this value should not be set above 12-15. This processor is currently " + procNode.getScheduledState().name());
+ }
+
+ if (procNode.getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ details.add(procNode + " is configured with a Scheduling Strategy of Event-Driven. The Event-Driven Scheduling Strategy is experimental and may trigger unexpected behavior, such as " +
+ "a Processor \"hanging\" or becoming unresponsive.");
+ }
+
+ if (isHighMemoryUtilizer(procNode)) {
+ final String processorType = procNode.getComponentType();
+ final int currentCount = highMemTypesToCounts.computeIfAbsent(processorType, k -> 0);
+ highMemTypesToCounts.put(processorType, currentCount + 1);
+ }
+ }
+
+ for (final Map.Entry<String, Integer> entry : highMemTypesToCounts.entrySet()) {
+ final String processorType = entry.getKey();
+ final int count = entry.getValue();
+
+ details.add(count + " instances of " + processorType + " are on the canvas, and this Processor is denoted as using large amounts of heap");
+ }
+ }
+
+ private boolean isHighMemoryUtilizer(final ProcessorNode procNode) {
+ final Processor processor = procNode.getProcessor();
+ final SystemResourceConsideration consideration = processor.getClass().getAnnotation(SystemResourceConsideration.class);
+ if (consideration != null) {
+ if (SystemResource.MEMORY == consideration.resource()) {
+ return true;
+ }
+ }
+
+ final SystemResourceConsiderations considerations = processor.getClass().getAnnotation(SystemResourceConsiderations.class);
+ if (considerations != null) {
+ for (final SystemResourceConsideration systemResourceConsideration : considerations.value()) {
+ if (SystemResource.MEMORY == systemResourceConsideration.resource()) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private void analyzeOpenFileHandles(final List<String> details) {
+ final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ final ObjectName osObjectName = os.getObjectName();
+
+ try {
+ final Object openFileCount = ManagementFactory.getPlatformMBeanServer().getAttribute(osObjectName, "OpenFileDescriptorCount");
+ final Object maxOpenFileCount = ManagementFactory.getPlatformMBeanServer().getAttribute(osObjectName, "MaxFileDescriptorCount");
+
+ if (openFileCount != null && maxOpenFileCount != null) {
+ final int openCount = ((Number) openFileCount).intValue();
+ final int maxCount = ((Number) maxOpenFileCount).intValue();
+ if (openCount >= 0.8 * maxCount) {
+ details.add("Open File Count for NiFi is " + openCount + ", which exceeds 80% of the Max Open File Count of " + maxCount
+ + ". It may be necessary to increase the maximum number of file handles that are available to a process in the Operating System.");
+ }
+ }
+ } catch (final Exception e) {
+ details.add("Failed to determine whether or not Open File Handle Count is of concern due to " + e);
+ }
+ }
+
+ private void analyzeTimerDrivenThreadExhaustion(final List<String> details) {
+ final int activethreadCount = flowController.getActiveThreadCount();
+ final int maxThreadCount = flowController.getMaxTimerDrivenThreadCount();
+
+ if (activethreadCount >= 0.95 * maxThreadCount) {
+ details.add("Active Thread Count is " + activethreadCount + ", with Max Active Thread Count of " + maxThreadCount + ". The Timer-Driven Thread Pool may be exhausted.");
+ }
+ }
+
+ private void analyzeColocatedRepos(final List<String> details) {
+ final Map<String, List<String>> fileStoreToRepos = new HashMap<>();
+
+ final RepositoryContextFactory contextFactory = flowController.getRepositoryContextFactory();
+
+ final String flowFileRepoStoreName = contextFactory.getFlowFileRepository().getFileStoreName();
+ List<String> repos = fileStoreToRepos.computeIfAbsent(flowFileRepoStoreName, k -> new ArrayList<>());
+ repos.add("FlowFile Repository");
+
+ for (final String containerName : contextFactory.getContentRepository().getContainerNames()) {
+ repos = fileStoreToRepos.computeIfAbsent(flowFileRepoStoreName, k -> new ArrayList<>());
+ repos.add("Content Repository <" + containerName + ">");
+ }
+
+ for (final String containerName : contextFactory.getProvenanceRepository().getContainerNames()) {
+ repos = fileStoreToRepos.computeIfAbsent(flowFileRepoStoreName, k -> new ArrayList<>());
+ repos.add("Provenance Repository <" + containerName + ">");
+ }
+
+ for (final List<String> repoNamesOnSameFileStore : fileStoreToRepos.values()) {
+ if (repoNamesOnSameFileStore.size() > 1) {
+ details.add("The following Repositories share the same File Store: " + repoNamesOnSameFileStore);
+ }
+ }
+ }
+
+ private void analyzeLeadershipChanges(final List<String> details) {
+ final LeaderElectionManager leaderElectionManager = flowController.getLeaderElectionManager();
+ if (leaderElectionManager == null) {
+ return;
+ }
+
+ final Map<String, Integer> changeCounts = leaderElectionManager.getLeadershipChangeCount(24, TimeUnit.HOURS);
+
+ changeCounts.entrySet().stream()
+ .filter(entry -> entry.getValue() > 4)
+ .forEach(entry -> details.add("Leadership for role <" + entry.getKey() + "> has changed " + entry.getValue() + " times in the last 24 hours"));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/FlowConfigurationDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/FlowConfigurationDiagnosticTask.java
new file mode 100644
index 0000000..b30f6ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/FlowConfigurationDiagnosticTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.ProcessGroupCounts;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FlowConfigurationDiagnosticTask implements DiagnosticTask {
+ private final FlowController flowController;
+
+ public FlowConfigurationDiagnosticTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ final FlowController.GroupStatusCounts statusCounts = flowController.getGroupStatusCounts(rootGroup);
+ details.add("Active Thread Count: " + statusCounts.getActiveThreadCount());
+ details.add("Terminated Thread Count: " + statusCounts.getTerminatedThreadCount());
+ details.add("Queued FlowFiles: " + statusCounts.getQueuedCount());
+ details.add("Queued Bytes: " + statusCounts.getQueuedContentSize());
+
+ final ProcessGroupCounts counts = rootGroup.getCounts();
+ details.add("Running Components: " + counts.getRunningCount());
+ details.add("Stopped Components: " + counts.getStoppedCount());
+ details.add("Invalid Components: " + counts.getInvalidCount());
+ details.add("Disabled Components: " + counts.getDisabledCount());
+ details.add("Local Input Ports: " + counts.getLocalInputPortCount());
+ details.add("Local Output Ports: " + counts.getLocalOutputPortCount());
+ details.add("Site-to-Site Input Ports: " + counts.getPublicInputPortCount());
+ details.add("Site-to-Site Input Ports: " + counts.getPublicOutputPortCount());
+ details.add("Active RPG Ports: " + counts.getActiveRemotePortCount());
+ details.add("Inactive RPG Ports: " + counts.getInactiveRemotePortCount());
+ details.add("");
+ details.add("Total Process Groups: " + rootGroup.findAllProcessGroups().size());
+ details.add("Locally Modified and Stale Count: " + counts.getLocallyModifiedAndStaleCount());
+ details.add("Locally Modified Count: " + counts.getLocallyModifiedCount());
+ details.add("Stale Count: " + counts.getStaleCount());
+ details.add("Sync Failure Count: " + counts.getSyncFailureCount());
+ details.add("Up-to-Date Count: " + counts.getUpToDateCount());
+
+ return new StandardDiagnosticsDumpElement("Flow Configuration", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/GarbageCollectionDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/GarbageCollectionDiagnosticTask.java
new file mode 100644
index 0000000..8b8aa81
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/GarbageCollectionDiagnosticTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GarbageCollectionDiagnosticTask implements DiagnosticTask {
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
+
+ final List<String> details = new ArrayList<>();
+ for (final GarbageCollectorMXBean garbageCollector : garbageCollectors) {
+ details.add(garbageCollector.getName() + " Collection Count : " + garbageCollector.getCollectionCount());
+ details.add(garbageCollector.getName() + " Collection Time (ms) : " + garbageCollector.getCollectionTime());
+ }
+
+ return new StandardDiagnosticsDumpElement("Garbage Collection", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/JVMDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/JVMDiagnosticTask.java
new file mode 100644
index 0000000..fb73600
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/JVMDiagnosticTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.util.FormatUtils;
+
+import java.io.File;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JVMDiagnosticTask implements DiagnosticTask {
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
+ final ClassLoadingMXBean classLoading = ManagementFactory.getClassLoadingMXBean();
+ final MemoryUsage heap = memory.getHeapMemoryUsage();
+ final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+ final RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+
+ final List<String> details = new ArrayList<>();
+ final NumberFormat numberFormat = NumberFormat.getInstance();
+
+ details.add("Total Thread Count: " + numberFormat.format(threads.getThreadCount()));
+ details.add("Daemon Thread Count: " + numberFormat.format(threads.getDaemonThreadCount()));
+
+ details.add("Max Heap: " + FormatUtils.formatDataSize(heap.getMax()));
+ details.add("Heap Used: " + FormatUtils.formatDataSize(heap.getUsed()));
+ details.add("Heap Committed: " + FormatUtils.formatDataSize(heap.getCommitted()));
+
+ details.add("JVM Uptime: " + FormatUtils.formatHoursMinutesSeconds(runtime.getUptime(), TimeUnit.MILLISECONDS));
+
+ details.add("JVM Spec Name: " + runtime.getSpecName());
+ details.add("JVM Spec Vendor: " + runtime.getSpecVendor());
+ details.add("JVM Spec Version: " + runtime.getSpecVersion());
+
+ details.add("JVM Vendor: " + runtime.getVmVendor());
+ details.add("JVM Version: " + runtime.getVmVersion());
+ details.add("Classes Loaded: " + numberFormat.format(classLoading.getLoadedClassCount()));
+ details.add("Classes Loaded Since Start: " + numberFormat.format(classLoading.getTotalLoadedClassCount()));
+ details.add("Working Directory: " + new File(".").getAbsolutePath());
+
+ return new StandardDiagnosticsDumpElement("Java Virtual Machine", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/LongRunningProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/LongRunningProcessorTask.java
new file mode 100644
index 0000000..73b7990
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/LongRunningProcessorTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.controller.ActiveThreadInfo;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.util.FormatUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class LongRunningProcessorTask implements DiagnosticTask {
+ private static final long MIN_ACTIVE_MILLIS = 30_000L;
+
+ private final FlowController flowController;
+
+ public LongRunningProcessorTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ for (final ProcessorNode processorNode : flowController.getFlowManager().getRootGroup().findAllProcessors()) {
+ final List<ActiveThreadInfo> activeThreads = processorNode.getActiveThreads();
+
+ for (final ActiveThreadInfo activeThread : activeThreads) {
+ if (activeThread.getActiveMillis() > MIN_ACTIVE_MILLIS) {
+ String threadName = activeThread.getThreadName();
+ if (activeThread.isTerminated()) {
+ threadName = threadName + " (Terminated)";
+ }
+
+ details.add(processorNode + " - " + threadName + " has been active for " + FormatUtils.formatMinutesSeconds(activeThread.getActiveMillis(), TimeUnit.MILLISECONDS) + " minutes");
+ }
+ }
+ }
+
+ if (details.isEmpty()) {
+ details.add("No long-running tasks identified");
+ }
+
+ return new StandardDiagnosticsDumpElement("Long-Running Processor Tasks", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/MemoryPoolPeakUsageTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/MemoryPoolPeakUsageTask.java
new file mode 100644
index 0000000..d71b99d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/MemoryPoolPeakUsageTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MemoryPoolPeakUsageTask implements DiagnosticTask {
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
+
+ final List<String> details = new ArrayList<>();
+ for (final MemoryPoolMXBean poolBean : pools) {
+ final String poolName = poolBean.getName();
+ final MemoryUsage usage = poolBean.getPeakUsage();
+ final long maxUsed = usage.getUsed();
+ final long maxAvailable = usage.getMax();
+
+ final String maxUsageDescription;
+ if (maxAvailable > 0) {
+ final double percentage = maxUsed * 100D / maxAvailable;
+ maxUsageDescription = String.format("%1$,d bytes, %2$.2f%%", maxUsed, percentage);
+ } else {
+ maxUsageDescription = String.format("%1$,d bytes", maxUsed);
+ }
+
+ details.add(poolName + ": " + maxUsageDescription);
+ }
+
+ return new StandardDiagnosticsDumpElement("Memory Pool Peak Usage", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NarsDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NarsDiagnosticTask.java
new file mode 100644
index 0000000..89bda68
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NarsDiagnosticTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.nar.ExtensionManager;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+public class NarsDiagnosticTask implements DiagnosticTask {
+ private final ExtensionManager extensionManager;
+
+ public NarsDiagnosticTask(final ExtensionManager extensionManager) {
+ this.extensionManager = extensionManager;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> nars = new ArrayList<>();
+ for (final Bundle bundle : extensionManager.getAllBundles()) {
+ nars.add(bundle.getBundleDetails().getCoordinate().toString());
+ }
+
+ nars.sort(Comparator.naturalOrder());
+
+ return new StandardDiagnosticsDumpElement("NARs Available", nars);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NiFiPropertiesDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NiFiPropertiesDiagnosticTask.java
new file mode 100644
index 0000000..0443cc0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/NiFiPropertiesDiagnosticTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.util.NiFiProperties;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class NiFiPropertiesDiagnosticTask implements DiagnosticTask {
+ private static final List<String> PROPERTY_NAMES = Arrays.asList(
+ "nifi.cluster.protocol.heartbeat.interval",
+ "nifi.cluster.node.connection.timeout",
+ "nifi.cluster.node.read.timeout",
+ "nifi.zookeeper.connect.timeout",
+ "nifi.zookeeper.session.timeout",
+ "nifi.ui.autorefresh.interval",
+ "nifi.cluster.node.protocol.max.threads",
+ "nifi.cluster.node.protocol.threads",
+ "nifi.security.user.login.identity.provider",
+ "nifi.security.user.authorizer",
+ "nifi.provenance.repository.implementation",
+ "nifi.provenance.repository.index.shard.size",
+ "nifi.provenance.repository.max.storage.size",
+ "nifi.components.status.repository.buffer.size",
+ "nifi.components.status.snapshot.frequency",
+ "nifi.content.repository.archive.max.retention.period",
+ "nifi.content.repository.archive.max.usage.percentage",
+ "nifi.flowfile.repository.checkpoint.interval",
+ "nifi.flowfile.repository.always.sync",
+ "nifi.components.status.snapshot.frequency",
+ "nifi.bored.yield.duration",
+ "nifi.queue.swap.threshold",
+ "nifi.security.identity.mapping.pattern.dn",
+ "nifi.security.identity.mapping.value.dn",
+ "nifi.security.identity.mapping.transform.dn",
+ "nifi.security.identity.mapping.pattern.kerb",
+ "nifi.security.identity.mapping.value.kerb",
+ "nifi.security.identity.mapping.transform.kerb",
+ "nifi.security.group.mapping.pattern.anygroup",
+ "nifi.security.group.mapping.value.anygroup",
+ "nifi.security.group.mapping.transform.anygroup"
+ );
+
+ private final NiFiProperties nifiProperties;
+
+ public NiFiPropertiesDiagnosticTask(final NiFiProperties nifiProperties) {
+ this.nifiProperties = nifiProperties;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ for (final String propertyName : PROPERTY_NAMES) {
+ details.add(propertyName + " : " + nifiProperties.getProperty(propertyName));
+ }
+
+ return new StandardDiagnosticsDumpElement("NiFi Properties", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/OperatingSystemDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/OperatingSystemDiagnosticTask.java
new file mode 100644
index 0000000..194fcae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/OperatingSystemDiagnosticTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class OperatingSystemDiagnosticTask implements DiagnosticTask {
+ private static final Logger logger = LoggerFactory.getLogger(OperatingSystemDiagnosticTask.class);
+ private static final Set<String> IGNORABLE_ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList("ObjectName", ""));
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+ final List<String> details = new ArrayList<>();
+
+ final NumberFormat numberFormat = NumberFormat.getInstance();
+
+ try {
+ final SortedMap<String, String> attributes = new TreeMap<>();
+
+ final ObjectName osObjectName = os.getObjectName();
+ final MBeanInfo mbeanInfo = ManagementFactory.getPlatformMBeanServer().getMBeanInfo(osObjectName);
+ for (final MBeanAttributeInfo attributeInfo : mbeanInfo.getAttributes()) {
+ final String attributeName = attributeInfo.getName();
+ if (IGNORABLE_ATTRIBUTE_NAMES.contains(attributeName)) {
+ continue;
+ }
+
+ final Object attributeValue = ManagementFactory.getPlatformMBeanServer().getAttribute(osObjectName, attributeName);
+
+ if (attributeValue instanceof Number) {
+ attributes.put(attributeName, numberFormat.format(attributeValue));
+ } else {
+ attributes.put(attributeName, String.valueOf(attributeValue));
+ }
+ }
+
+ attributes.forEach((key, value) -> details.add(key + " : " + value));
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Operating System details", e);
+ return new StandardDiagnosticsDumpElement("Operating System / Hardware", Collections.singletonList("Failed to obtain Operating System details"));
+ }
+
+ return new StandardDiagnosticsDumpElement("Operating System / Hardware", details);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/RepositoryDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/RepositoryDiagnosticTask.java
new file mode 100644
index 0000000..57fbc4f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/RepositoryDiagnosticTask.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.diagnostics.bootstrap.tasks;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.diagnostics.DiagnosticTask;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.util.FormatUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RepositoryDiagnosticTask implements DiagnosticTask {
+ private final FlowController flowController;
+
+ public RepositoryDiagnosticTask(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public DiagnosticsDumpElement captureDump(final boolean verbose) {
+ final List<String> details = new ArrayList<>();
+
+ final RepositoryContextFactory contextFactory = flowController.getRepositoryContextFactory();
+
+ final ProcessGroupStatus rootGroupStatus = flowController.getEventAccess().getGroupStatus(flowController.getFlowManager().getRootGroupId());
+
+ try {
+ captureDiagnostics(contextFactory.getFlowFileRepository(), details);
+ } catch (final IOException ioe) {
+ details.add("Failed to gather details about FlowFile Repository");
+ }
+
+ try {
+ details.add("");
+ captureDiagnostics(contextFactory.getContentRepository(), rootGroupStatus, details);
+ } catch (final IOException ioe) {
+ details.add("Failed to gather details about Content Repository");
+ }
+
+ try {
+ details.add("");
+ captureDiagnostics(contextFactory.getProvenanceRepository(), details);
+ } catch (final IOException ioe) {
+ details.add("Failed to gather details about Provenance Repository");
+ }
+
+ return new StandardDiagnosticsDumpElement("NiFi Repositories", details);
+ }
+
+ private void captureDiagnostics(final FlowFileRepository repository, final List<String> details) throws IOException {
+ details.add("FlowFile Repository Implementation: " + repository.getClass().getName());
+ details.add("FlowFile Repository File Store: " + repository.getFileStoreName());
+ details.add("FlowFile Repository Storage Capacity: " + FormatUtils.formatDataSize(repository.getStorageCapacity()));
+ details.add("FlowFile Repository Usable Space: " + FormatUtils.formatDataSize(repository.getUsableStorageSpace()));
+ }
+
+ private void captureDiagnostics(final ContentRepository repository, final ProcessGroupStatus status, final List<String> details) throws IOException {
+ details.add("Content Repository Implementation: " + repository.getClass().getName());
+ for (final String containerName : repository.getContainerNames()) {
+ details.add("Content Repository <" + containerName + "> File Store: " + repository.getContainerFileStoreName(containerName));
+ details.add("Content Repository <" + containerName + "> Storage Capacity: " + FormatUtils.formatDataSize(repository.getContainerCapacity(containerName)));
+ details.add("Content Repository <" + containerName + "> Usable Space: " + FormatUtils.formatDataSize(repository.getContainerUsableSpace(containerName)));
+ }
+
+ details.add("Bytes Read (Last 5 mins): " + FormatUtils.formatDataSize(status.getBytesRead()));
+ details.add("Bytes Written (Last 5 mins): " + FormatUtils.formatDataSize(status.getBytesWritten()));
+ }
+
+ private void captureDiagnostics(final ProvenanceRepository repository, final List<String> details) throws IOException {
+ details.add("Provenance Repository Implementation: " + repository.getClass().getName());
+ for (final String containerName : repository.getContainerNames()) {
+ details.add("Provenance Repository <" + containerName + "> File Store: " + repository.getContainerFileStoreName(containerName));
+ details.add("Provenance Repository <" + containerName + "> Storage Capacity: " + FormatUtils.formatDataSize(repository.getContainerCapacity(containerName)));
+ details.add("Provenance Repository <" + containerName + "> Usable Space: " + FormatUtils.formatDataSize(repository.getContainerUsableSpace(containerName)));
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index 2261ae8..89a7b0d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -30,7 +30,7 @@
<!-- flow file event repository -->
<bean id="flowFileEventRepository" class="org.apache.nifi.spring.RingBufferEventRepositoryBean">
</bean>
-
+
<bean id="stringEncryptor" class="org.apache.nifi.encrypt.StringEncryptor" factory-method="createEncryptor">
<constructor-arg ref="nifiProperties" />
</bean>
@@ -64,10 +64,15 @@
<property name="encryptor" ref="stringEncryptor" />
<property name="authorizer" ref="authorizer" />
</bean>
-
+
<!-- bulletin repository -->
<bean id="bulletinRepository" class="org.apache.nifi.events.VolatileBulletinRepository" />
+ <bean id="diagnosticsFactory" class="org.apache.nifi.diagnostics.bootstrap.BootstrapDiagnosticsFactory">
+ <property name="flowController" ref="flowController" />
+ <property name="nifiProperties" ref="nifiProperties" />
+ </bean>
+
<bean id="eventReporter" class="org.apache.nifi.events.StandardEventReporter">
<constructor-arg ref="bulletinRepository" />
</bean>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
index a1206c7..7715a59 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
@@ -168,6 +168,11 @@ public class MockSwapManager implements FlowFileSwapManager {
}
@Override
+ public String getQueueIdentifier(final String swapLocation) {
+ return null;
+ }
+
+ @Override
public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) throws IOException {
return swappedOut.keySet().stream()
.filter(key -> key.contains("."))
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index abe420f..cdc0e16 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -224,6 +224,11 @@ public class TestFileSystemSwapManager {
@Override
public void freeze(ResourceClaim claim) {
}
+
+ @Override
+ public boolean isDestructable(final ResourceClaim claim) {
+ return false;
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
index f2cd84c..3bf38ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRocksDBFlowFileRepository.java
@@ -793,6 +793,11 @@ public class TestRocksDBFlowFileRepository {
}
@Override
+ public String getQueueIdentifier(final String swapLocation) {
+ return null;
+ }
+
+ @Override
public Set<String> getSwappedPartitionNames(FlowFileQueue queue) {
return Collections.emptySet();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index a18b43a..ca656d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -769,6 +769,11 @@ public class TestWriteAheadFlowFileRepository {
}
@Override
+ public String getQueueIdentifier(final String swapLocation) {
+ return null;
+ }
+
+ @Override
public Set<String> getSwappedPartitionNames(FlowFileQueue queue) throws IOException {
return Collections.emptySet();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 92b65ec..1191659 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.controller.repository.claim;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -24,9 +27,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class StandardResourceClaimManager implements ResourceClaimManager {
private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
@@ -203,6 +203,16 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
}
}
+ public boolean isDestructable(final ResourceClaim claim) {
+ if (claim == null) {
+ return false;
+ }
+
+ synchronized (claim) {
+ return destructableClaims.contains(claim);
+ }
+ }
+
private static final class ClaimCount {
private final ResourceClaim claim;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index 0d8db06..b1d30af 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -344,7 +344,7 @@ case "$1" in
install)
install "$@"
;;
- start|stop|run|status|dump|env)
+ start|stop|run|status|dump|diagnostics|env)
main "$@"
;;
restart)
@@ -353,6 +353,6 @@ case "$1" in
run "start"
;;
*)
- echo "Usage nifi {start|stop|run|restart|status|dump|install}"
+ echo "Usage nifi {start|stop|run|restart|status|dump|diagnostics|install}"
;;
esac
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 373212a..dc5f1dd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -16,36 +16,26 @@
*/
package org.apache.nifi;
+import org.apache.nifi.diagnostics.DiagnosticsDump;
+import org.apache.nifi.util.LimitingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.lang.management.LockInfo;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MonitorInfo;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.nifi.util.LimitingInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class BootstrapListener {
private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
@@ -195,6 +185,23 @@ public class BootstrapListener {
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
+ case DIAGNOSTICS:
+ logger.info("Received DIAGNOSTICS request from Bootstrap");
+ final String[] args = request.getArgs();
+ boolean verbose = false;
+ if (args == null) {
+ verbose = false;
+ } else {
+ for (final String arg : args) {
+ if ("--verbose=true".equalsIgnoreCase(arg)) {
+ verbose = true;
+ break;
+ }
+ }
+ }
+
+ writeDiagnostics(socket.getOutputStream(), verbose);
+ break;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
@@ -214,108 +221,14 @@ public class BootstrapListener {
}
}
- private static void writeDump(final OutputStream out) throws IOException {
- final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
- final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
-
- final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
- final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
- final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
-
- final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
- for (final ThreadInfo info : infos) {
- sortedInfos.add(info);
- }
- Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
- @Override
- public int compare(ThreadInfo o1, ThreadInfo o2) {
- return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
- }
- });
-
- final StringBuilder sb = new StringBuilder();
- for (final ThreadInfo info : sortedInfos) {
- sb.append("\n");
- sb.append("\"").append(info.getThreadName()).append("\" Id=");
- sb.append(info.getThreadId()).append(" ");
- sb.append(info.getThreadState().toString()).append(" ");
-
- switch (info.getThreadState()) {
- case BLOCKED:
- case TIMED_WAITING:
- case WAITING:
- sb.append(" on ");
- sb.append(info.getLockInfo());
- break;
- default:
- break;
- }
-
- if (info.isSuspended()) {
- sb.append(" (suspended)");
- }
- if (info.isInNative()) {
- sb.append(" (in native code)");
- }
-
- if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
- for (final long id : deadlockedThreadIds) {
- if (id == info.getThreadId()) {
- sb.append(" ** DEADLOCKED THREAD **");
- }
- }
- }
-
- if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
- for (final long id : monitorDeadlockThreadIds) {
- if (id == info.getThreadId()) {
- sb.append(" ** MONITOR-DEADLOCKED THREAD **");
- }
- }
- }
-
- final StackTraceElement[] stackTraces = info.getStackTrace();
- for (final StackTraceElement element : stackTraces) {
- sb.append("\n\tat ").append(element);
-
- final MonitorInfo[] monitors = info.getLockedMonitors();
- for (final MonitorInfo monitor : monitors) {
- if (monitor.getLockedStackFrame().equals(element)) {
- sb.append("\n\t- waiting on ").append(monitor);
- }
- }
- }
-
- final LockInfo[] lockInfos = info.getLockedSynchronizers();
- if (lockInfos.length > 0) {
- sb.append("\n\t");
- sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
- for (final LockInfo lockInfo : lockInfos) {
- sb.append("\n\t- ").append(lockInfo.toString());
- }
- }
-
- sb.append("\n");
- }
-
- if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
- sb.append("\n\nDEADLOCK DETECTED!");
- sb.append("\nThe following thread IDs are deadlocked:");
- for (final long id : deadlockedThreadIds) {
- sb.append("\n").append(id);
- }
- }
-
- if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
- sb.append("\n\nMONITOR DEADLOCK DETECTED!");
- sb.append("\nThe following thread IDs are deadlocked:");
- for (final long id : monitorDeadlockThreadIds) {
- sb.append("\n").append(id);
- }
- }
+ private void writeDump(final OutputStream out) throws IOException {
+ final DiagnosticsDump diagnosticsDump = nifi.getServer().getThreadDumpFactory().create(true);
+ diagnosticsDump.writeTo(out);
+ }
- writer.write(sb.toString());
- writer.flush();
+ private void writeDiagnostics(final OutputStream out, final boolean verbose) throws IOException {
+ final DiagnosticsDump diagnosticsDump = nifi.getServer().getDiagnosticsFactory().create(verbose);
+ diagnosticsDump.writeTo(out);
}
private void echoPing(final OutputStream out) throws IOException {
@@ -367,11 +280,10 @@ public class BootstrapListener {
}
private static class BootstrapRequest {
-
- public static enum RequestType {
-
+ public enum RequestType {
SHUTDOWN,
DUMP,
+ DIAGNOSTICS,
PING;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
index 7a44250..29578e0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -167,6 +167,10 @@ public class NiFi {
}
}
+ NiFiServer getServer() {
+ return nifiServer;
+ }
+
protected void setDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
index edb8f45..2169579 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFiServer.java
@@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import java.util.Set;
@@ -33,4 +34,8 @@ public interface NiFiServer {
void setBundles(Bundle systemBundle, Set<Bundle> bundles);
void stop();
+
+ DiagnosticsFactory getDiagnosticsFactory();
+
+ DiagnosticsFactory getThreadDumpFactory();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index af1042e..e39949c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -18,34 +18,6 @@ package org.apache.nifi.web.server;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
-import java.util.stream.Collectors;
-import javax.servlet.DispatcherType;
-import javax.servlet.Filter;
-import javax.servlet.ServletContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.NiFiServer;
@@ -54,6 +26,10 @@ import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
+import org.apache.nifi.diagnostics.DiagnosticsDump;
+import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
+import org.apache.nifi.diagnostics.DiagnosticsFactory;
+import org.apache.nifi.diagnostics.ThreadDumpTask;
import org.apache.nifi.documentation.DocGenerator;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -110,6 +86,38 @@ import org.springframework.context.ApplicationContext;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
+import javax.servlet.DispatcherType;
+import javax.servlet.Filter;
+import javax.servlet.ServletContext;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Collectors;
+
/**
* Encapsulates the Jetty instance.
*/
@@ -136,6 +144,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private Set<Bundle> bundles;
private ExtensionMapping extensionMapping;
private NarAutoLoader narAutoLoader;
+ private DiagnosticsFactory diagnosticsFactory;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@@ -986,6 +995,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
webContentViewerContext.addFilter(securityFilter, "/*", EnumSet.allOf(DispatcherType.class));
}
}
+
+ diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
}
// ensure the web document war was loaded and provide the extension mapping
@@ -1046,6 +1057,19 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
}
}
+ @Override
+ public DiagnosticsFactory getDiagnosticsFactory() {
+ // The diagnosticsFactory is initialized during server startup. If the diagnostics factory happens to be
+ // requested before the Server starts, or after the server fails to start, we cannot provide the fully initialized
+ // diagnostics factory. But it is still helpful to provide what we can, so we will provide the Thread Dump Factory.
+ return diagnosticsFactory == null ? getThreadDumpFactory() : diagnosticsFactory;
+ }
+
+ @Override
+ public DiagnosticsFactory getThreadDumpFactory() {
+ return new ThreadDumpDiagnosticsFactory();
+ }
+
private void performInjectionForComponentUis(final Collection<WebAppContext> componentUiExtensionWebContexts,
final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) {
if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {
@@ -1201,6 +1225,25 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return componentUiExtensionsByType;
}
}
+
+ private static class ThreadDumpDiagnosticsFactory implements DiagnosticsFactory {
+ @Override
+ public DiagnosticsDump create(final boolean verbose) {
+ return new DiagnosticsDump() {
+ @Override
+ public void writeTo(final OutputStream out) throws IOException {
+ final DiagnosticsDumpElement threadDumpElement = new ThreadDumpTask().captureDump(verbose);
+ final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ for (final String detail : threadDumpElement.getDetails()) {
+ writer.write(detail);
+ writer.write("\n");
+ }
+
+ writer.flush();
+ }
+ };
+ }
+ }
}
@FunctionalInterface