You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/14 19:35:36 UTC

[12/17] incubator-nifi git commit: NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script

NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7737fbd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7737fbd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7737fbd8

Branch: refs/heads/NIFI-250
Commit: 7737fbd84d955520e2f6e23bbbfa8a5ebe43b3ed
Parents: c62aba1
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 14 12:24:09 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 14 12:24:09 2015 -0500

----------------------------------------------------------------------
 .../src/main/resources/bin/dump-nifi.bat        |  33 +++++
 .../resources/src/main/resources/bin/nifi.sh    |   4 +-
 .../java/org/apache/nifi/BootstrapListener.java | 122 ++++++++++++++++++-
 .../java/org/apache/nifi/bootstrap/RunNiFi.java |  89 ++++++++++++--
 4 files changed, 234 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
new file mode 100644
index 0000000..71e5a1a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat
@@ -0,0 +1,33 @@
+@echo off
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET NIFI_ROOT=%~dp0..\
+CD /d "%NIFI_ROOT%"
+SET LIB_DIR=lib\bootstrap
+SET CONF_DIR=conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=dump
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
index 163f8e2..fb0d22e 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
@@ -172,10 +172,10 @@ case "$1" in
     install)
         install "$@"
         ;;
-    start|stop|run|status)
+    start|stop|run|restart|status|dump)
         main "$@"
         ;;
     *)
-        echo "Usage nifi {start|stop|run|status|install}"
+        echo "Usage nifi {start|stop|run|restart|status|dump|install}"
         ;;
 esac

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 3393952..590797c 100644
--- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -17,16 +17,27 @@
 package org.apache.nifi;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -162,6 +173,10 @@ public class BootstrapListener {
 										echoShutdown(socket.getOutputStream());
 										nifi.shutdownHook();
 										return;
+									case DUMP:
+									    logger.info("Received DUMP request from Bootstrap");
+									    writeDump(socket.getOutputStream());
+									    break;
 								}
 							} catch (final Throwable t) {
 								logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
@@ -182,6 +197,110 @@ public class BootstrapListener {
 	}
 	
 	
+	private static void writeDump(final OutputStream out) throws IOException {
+        final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+        final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+        
+        final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
+        final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
+        final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
+        
+        final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
+        for ( final ThreadInfo info : infos ) {
+            sortedInfos.add(info);
+        }
+        Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
+            @Override
+            public int compare(ThreadInfo o1, ThreadInfo o2) {
+                return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
+            }
+        });
+        
+        final StringBuilder sb = new StringBuilder();
+        for ( final ThreadInfo info : sortedInfos ) {
+            sb.append("\n");
+            sb.append("\"").append(info.getThreadName()).append("\" Id=");
+            sb.append(info.getThreadId()).append(" ");
+            sb.append(info.getThreadState().toString()).append(" ");
+            
+            switch (info.getThreadState()) {
+                case BLOCKED:
+                case TIMED_WAITING:
+                case WAITING:
+                    sb.append(" on ");
+                    sb.append(info.getLockInfo());
+                    break;
+                default:
+                    break;
+            }
+            
+            if (info.isSuspended()) {
+                sb.append(" (suspended)");
+            }
+            if ( info.isInNative() ) {
+                sb.append(" (in native code)");
+            }
+            
+            if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) {
+                for ( final long id : deadlockedThreadIds ) {
+                    if ( id == info.getThreadId() ) {
+                        sb.append(" ** DEADLOCKED THREAD **");
+                    }
+                }
+            }
+
+           if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) {
+                for ( final long id : monitorDeadlockThreadIds ) {
+                    if ( id == info.getThreadId() ) {
+                        sb.append(" ** MONITOR-DEADLOCKED THREAD **");
+                    }
+                }
+            }
+
+            final StackTraceElement[] stackTraces = info.getStackTrace();
+            for ( final StackTraceElement element : stackTraces ) {
+                sb.append("\n\tat ").append(element);
+                
+                final MonitorInfo[] monitors = info.getLockedMonitors();
+                for ( final MonitorInfo monitor : monitors ) {
+                    if ( monitor.getLockedStackFrame().equals(element) ) {
+                        sb.append("\n\t- waiting on ").append(monitor);
+                    }
+                }
+            }
+            
+            final LockInfo[] lockInfos = info.getLockedSynchronizers();
+            if ( lockInfos.length > 0 ) {
+                sb.append("\n\t");
+                sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
+                for ( final LockInfo lockInfo : lockInfos ) {
+                    sb.append("\n\t- ").append(lockInfo.toString());
+                }
+            }
+            
+            sb.append("\n");
+        }
+        
+        if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+            sb.append("\n\nDEADLOCK DETECTED!");
+            sb.append("\nThe following thread IDs are deadlocked:");
+            for ( final long id : deadlockedThreadIds ) {
+                sb.append("\n").append(id);
+            }
+        }
+
+       if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+            sb.append("\n\nMONITOR DEADLOCK DETECTED!");
+            sb.append("\nThe following thread IDs are deadlocked:");
+            for ( final long id : monitorDeadlockThreadIds ) {
+                sb.append("\n").append(id);
+            }
+        }
+
+        writer.write(sb.toString());
+        writer.flush();
+    }
+	
 	private void echoPing(final OutputStream out) throws IOException {
 		out.write("PING\n".getBytes(StandardCharsets.UTF_8));
 		out.flush();
@@ -205,7 +324,7 @@ public class BootstrapListener {
 		
 		final String line = reader.readLine();
 		final String[] splits = line.split(" ");
-		if ( splits.length < 0 ) {
+		if ( splits.length < 1 ) {
 			throw new IOException("Received invalid request from Bootstrap: " + line);
 		}
 		
@@ -235,6 +354,7 @@ public class BootstrapListener {
 	private static class BootstrapRequest {
 		public static enum RequestType {
 			SHUTDOWN,
+			DUMP,
 			PING;
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7737fbd8/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index e8f6439..f920860 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,6 +76,7 @@ public class RunNiFi {
 	
 	public static final String SHUTDOWN_CMD = "SHUTDOWN";
 	public static final String PING_CMD = "PING";
+	public static final String DUMP_CMD = "DUMP";
 	
 	private volatile boolean autoRestartNiFi = true;
 	private volatile int ccPort = -1;
@@ -105,41 +107,52 @@ public class RunNiFi {
 	private static void printUsage() {
 		System.out.println("Usage:");
 		System.out.println();
-		System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command>");
+		System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command> [options]");
 		System.out.println();
 		System.out.println("Valid commands include:");
 		System.out.println("");
 		System.out.println("Start : Start a new instance of Apache NiFi");
 		System.out.println("Stop : Stop a running instance of Apache NiFi");
+		System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
 		System.out.println("Status : Determine if there is a running instance of Apache NiFi");
+		System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
 		System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
 		System.out.println();
 	}
 
+	private static String[] shift(final String[] orig) {
+	    return Arrays.copyOfRange(orig, 1, orig.length);
+	}
 	
-	public static void main(final String[] args) throws IOException, InterruptedException {
-		if ( args.length < 1 || args.length > 2 ) {
+	public static void main(String[] args) throws IOException, InterruptedException {
+		if ( args.length < 1 || args.length > 3 ) {
 			printUsage();
 			return;
 		}
 		
+		File dumpFile = null;
 		boolean verbose = false;
-		if ( args.length == 2 ) {
-		    if ( args[0].equals("-verbose") ) {
-		        verbose = true;
-		    } else {
-		        printUsage();
-		        return;
-		    }
+		if ( args[0].equals("-verbose") ) {
+		    verbose = true;
+		    args = shift(args);
 		}
 		
-		final String cmd = args.length == 1 ? args[0] : args[1];
+		final String cmd = args[0];
+	    if (cmd.equals("dump") ) {
+	        if ( args.length > 1 ) {
+	            dumpFile = new File(args[1]);
+	        } else {
+	            dumpFile = null;
+	        }
+	    }
 		
 		switch (cmd.toLowerCase()) {
 			case "start":
 			case "run":
 			case "stop":
 			case "status":
+			case "dump":
+			case "restart":
 				break;
 			default:
 				printUsage();
@@ -178,6 +191,13 @@ public class RunNiFi {
 			case "status":
 				runNiFi.status();
 				break;
+			case "restart":
+			    runNiFi.stop();
+			    runNiFi.start(false);
+			    break;
+			case "dump":
+			    runNiFi.dump(dumpFile);
+			    break;
 		}
 	}
 	
@@ -391,6 +411,53 @@ public class RunNiFi {
 	}
 	
 	
+	/**
+	 * Writes a NiFi thread dump to the given file; if file is null, logs at INFO level instead.
+	 * @param dumpFile
+	 * @return
+	 * @throws IOException
+	 */
+	public void dump(final File dumpFile) throws IOException {
+	    final Integer port = getCurrentPort();
+        if ( port == null ) {
+            System.out.println("Apache NiFi is not currently running");
+        }
+        
+        final Properties nifiProps = loadProperties();
+        final String secretKey = nifiProps.getProperty("secret.key");
+
+        final StringBuilder sb = new StringBuilder();
+	    try (final Socket socket = new Socket()) {
+            logger.fine("Connecting to NiFi instance");
+            socket.setSoTimeout(60000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.fine("Established connection to NiFi instance.");
+            socket.setSoTimeout(60000);
+            
+            logger.fine("Sending DUMP Command to port " + port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+            
+            final InputStream in = socket.getInputStream();
+            final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+            String line;
+            while ((line = reader.readLine()) != null ) {
+                sb.append(line).append("\n");
+            }
+        }
+	    
+	    final String dump = sb.toString();
+	    if ( dumpFile == null ) {
+	        logger.info(dump);
+	    } else {
+	        try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
+	            fos.write(dump.getBytes(StandardCharsets.UTF_8));
+	        }
+	        logger.info("Successfully wrote thread dump to " + dumpFile.getAbsolutePath());
+	    }
+	}
+	
 	public void stop() throws IOException {
 		final Integer port = getCurrentPort();
 		if ( port == null ) {