You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/14 19:35:36 UTC
[12/17] incubator-nifi git commit: NIFI-262,
NIFI-263: Added 'restart' and 'dump' options to nifi.sh script
NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7737fbd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7737fbd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7737fbd8
Branch: refs/heads/NIFI-250
Commit: 7737fbd84d955520e2f6e23bbbfa8a5ebe43b3ed
Parents: c62aba1
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 14 12:24:09 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 14 12:24:09 2015 -0500
----------------------------------------------------------------------
.../src/main/resources/bin/dump-nifi.bat | 33 +++++
.../resources/src/main/resources/bin/nifi.sh | 4 +-
.../java/org/apache/nifi/BootstrapListener.java | 122 ++++++++++++++++++-
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 89 ++++++++++++--
4 files changed, 234 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
new file mode 100644
index 0000000..71e5a1a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
@@ -0,0 +1,33 @@
+@echo off
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET NIFI_ROOT=%~dp0..\
+CD /d "%NIFI_ROOT%"
+SET LIB_DIR=lib\bootstrap
+SET CONF_DIR=conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=dump
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
index 163f8e2..fb0d22e 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
@@ -172,10 +172,10 @@ case "$1" in
install)
install "$@"
;;
- start|stop|run|status)
+ start|stop|run|restart|status|dump)
main "$@"
;;
*)
- echo "Usage nifi {start|stop|run|status|install}"
+ echo "Usage nifi {start|stop|run|restart|status|dump|install}"
;;
esac
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 3393952..590797c 100644
--- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -17,16 +17,27 @@
package org.apache.nifi;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -162,6 +173,10 @@ public class BootstrapListener {
echoShutdown(socket.getOutputStream());
nifi.shutdownHook();
return;
+ case DUMP:
+ logger.info("Received DUMP request from Bootstrap");
+ writeDump(socket.getOutputStream());
+ break;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
@@ -182,6 +197,110 @@ public class BootstrapListener {
}
+ private static void writeDump(final OutputStream out) throws IOException {
+ final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+ final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+
+ final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
+ final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
+ final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
+
+ final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
+ for ( final ThreadInfo info : infos ) {
+ sortedInfos.add(info);
+ }
+ Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
+ @Override
+ public int compare(ThreadInfo o1, ThreadInfo o2) {
+ return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
+ }
+ });
+
+ final StringBuilder sb = new StringBuilder();
+ for ( final ThreadInfo info : sortedInfos ) {
+ sb.append("\n");
+ sb.append("\"").append(info.getThreadName()).append("\" Id=");
+ sb.append(info.getThreadId()).append(" ");
+ sb.append(info.getThreadState().toString()).append(" ");
+
+ switch (info.getThreadState()) {
+ case BLOCKED:
+ case TIMED_WAITING:
+ case WAITING:
+ sb.append(" on ");
+ sb.append(info.getLockInfo());
+ break;
+ default:
+ break;
+ }
+
+ if (info.isSuspended()) {
+ sb.append(" (suspended)");
+ }
+ if ( info.isInNative() ) {
+ sb.append(" (in native code)");
+ }
+
+ if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) {
+ for ( final long id : deadlockedThreadIds ) {
+ if ( id == info.getThreadId() ) {
+ sb.append(" ** DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) {
+ for ( final long id : monitorDeadlockThreadIds ) {
+ if ( id == info.getThreadId() ) {
+ sb.append(" ** MONITOR-DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ final StackTraceElement[] stackTraces = info.getStackTrace();
+ for ( final StackTraceElement element : stackTraces ) {
+ sb.append("\n\tat ").append(element);
+
+ final MonitorInfo[] monitors = info.getLockedMonitors();
+ for ( final MonitorInfo monitor : monitors ) {
+ if ( monitor.getLockedStackFrame().equals(element) ) {
+ sb.append("\n\t- waiting on ").append(monitor);
+ }
+ }
+ }
+
+ final LockInfo[] lockInfos = info.getLockedSynchronizers();
+ if ( lockInfos.length > 0 ) {
+ sb.append("\n\t");
+ sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
+ for ( final LockInfo lockInfo : lockInfos ) {
+ sb.append("\n\t- ").append(lockInfo.toString());
+ }
+ }
+
+ sb.append("\n");
+ }
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+ sb.append("\n\nDEADLOCK DETECTED!");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for ( final long id : deadlockedThreadIds ) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+ sb.append("\n\nMONITOR DEADLOCK DETECTED!");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for ( final long id : monitorDeadlockThreadIds ) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ writer.write(sb.toString());
+ writer.flush();
+ }
+
private void echoPing(final OutputStream out) throws IOException {
out.write("PING\n".getBytes(StandardCharsets.UTF_8));
out.flush();
@@ -205,7 +324,7 @@ public class BootstrapListener {
final String line = reader.readLine();
final String[] splits = line.split(" ");
- if ( splits.length < 0 ) {
+ if ( splits.length < 1 ) {
throw new IOException("Received invalid request from Bootstrap: " + line);
}
@@ -235,6 +354,7 @@ public class BootstrapListener {
private static class BootstrapRequest {
public static enum RequestType {
SHUTDOWN,
+ DUMP,
PING;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index e8f6439..f920860 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -75,6 +76,7 @@ public class RunNiFi {
public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String PING_CMD = "PING";
+ public static final String DUMP_CMD = "DUMP";
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
@@ -105,41 +107,52 @@ public class RunNiFi {
private static void printUsage() {
System.out.println("Usage:");
System.out.println();
- System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command>");
+ System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command> [options]");
System.out.println();
System.out.println("Valid commands include:");
System.out.println("");
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
+ System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
+ System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println();
}
+ private static String[] shift(final String[] orig) {
+ return Arrays.copyOfRange(orig, 1, orig.length);
+ }
- public static void main(final String[] args) throws IOException, InterruptedException {
- if ( args.length < 1 || args.length > 2 ) {
+ public static void main(String[] args) throws IOException, InterruptedException {
+ if ( args.length < 1 || args.length > 3 ) {
printUsage();
return;
}
+ File dumpFile = null;
boolean verbose = false;
- if ( args.length == 2 ) {
- if ( args[0].equals("-verbose") ) {
- verbose = true;
- } else {
- printUsage();
- return;
- }
+ if ( args[0].equals("-verbose") ) {
+ verbose = true;
+ args = shift(args);
}
- final String cmd = args.length == 1 ? args[0] : args[1];
+ final String cmd = args[0];
+ if (cmd.equals("dump") ) {
+ if ( args.length > 1 ) {
+ dumpFile = new File(args[1]);
+ } else {
+ dumpFile = null;
+ }
+ }
switch (cmd.toLowerCase()) {
case "start":
case "run":
case "stop":
case "status":
+ case "dump":
+ case "restart":
break;
default:
printUsage();
@@ -178,6 +191,13 @@ public class RunNiFi {
case "status":
runNiFi.status();
break;
+ case "restart":
+ runNiFi.stop();
+ runNiFi.start(false);
+ break;
+ case "dump":
+ runNiFi.dump(dumpFile);
+ break;
}
}
@@ -391,6 +411,53 @@ public class RunNiFi {
}
+ /**
+ * Writes a NiFi thread dump to the given file; if file is null, logs at INFO level instead.
+ * @param dumpFile
+ * @return
+ * @throws IOException
+ */
+ public void dump(final File dumpFile) throws IOException {
+ final Integer port = getCurrentPort();
+ if ( port == null ) {
+ System.out.println("Apache NiFi is not currently running");
+ }
+
+ final Properties nifiProps = loadProperties();
+ final String secretKey = nifiProps.getProperty("secret.key");
+
+ final StringBuilder sb = new StringBuilder();
+ try (final Socket socket = new Socket()) {
+ logger.fine("Connecting to NiFi instance");
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", port));
+ logger.fine("Established connection to NiFi instance.");
+ socket.setSoTimeout(60000);
+
+ logger.fine("Sending DUMP Command to port " + port);
+ final OutputStream out = socket.getOutputStream();
+ out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ final InputStream in = socket.getInputStream();
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while ((line = reader.readLine()) != null ) {
+ sb.append(line).append("\n");
+ }
+ }
+
+ final String dump = sb.toString();
+ if ( dumpFile == null ) {
+ logger.info(dump);
+ } else {
+ try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
+ fos.write(dump.getBytes(StandardCharsets.UTF_8));
+ }
+ logger.info("Successfully wrote thread dump to " + dumpFile.getAbsolutePath());
+ }
+ }
+
public void stop() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {