You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/09 20:06:56 UTC
[1/4] incubator-nifi git commit: NIFI-145: Initial commit of
bootstrap module
Repository: incubator-nifi
Updated Branches:
refs/heads/bootstrap [created] eed4a9bb8
NIFI-145: Initial commit of bootstrap module
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/567dfc79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/567dfc79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/567dfc79
Branch: refs/heads/bootstrap
Commit: 567dfc794c565854850b7e223601528f2a3fd814
Parents: 203e83e
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 9 08:48:07 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Dec 9 08:48:07 2014 -0500
----------------------------------------------------------------------
.gitignore | 1 +
nifi-bootstrap/pom.xml | 18 ++
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 176 +++++++++++++++++++
.../org/apache/nifi/bootstrap/ShutdownHook.java | 14 ++
4 files changed, 209 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/567dfc79/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index f026df6..6d4eca9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
target
.project
.settings
+.classpath
nbactions.xml
.DS_Store
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/567dfc79/nifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/pom.xml b/nifi-bootstrap/pom.xml
new file mode 100644
index 0000000..b620c84
--- /dev/null
+++ b/nifi-bootstrap/pom.xml
@@ -0,0 +1,18 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-bootstrap</artifactId>
+ <packaging>jar</packaging>
+
+ <name>nifi-bootstrap</name>
+
+ <dependencies>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/567dfc79/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
new file mode 100644
index 0000000..afa1f47
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -0,0 +1,176 @@
+package org.apache.nifi.bootstrap;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bootstrap class to run Apache NiFi.
+ *
+ * This class looks for the bootstrap.conf file by looking in the following places (in order):
+ * <ol>
+ * <li>First argument to the program</li>
+ * <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li>
+ * <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li>
+ * <li>./conf/bootstrap.conf, where {@code .} represents the working directory.
+ * </ol>
+ *
+ * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException].
+ */
+public class RunNiFi {
+ public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
+ public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static void main(final String[] args) throws IOException, InterruptedException {
+ final ProcessBuilder builder = new ProcessBuilder();
+
+ String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file");
+
+ if ( configFilename == null ) {
+ final String nifiHome = System.getenv("NIFI_HOME");
+ if ( nifiHome != null ) {
+ final File nifiHomeFile = new File(nifiHome.trim());
+ final File configFile = new File(nifiHomeFile, DEFAULT_CONFIG_FILE);
+ configFilename = configFile.getAbsolutePath();
+ }
+ }
+
+ if ( configFilename == null ) {
+ configFilename = DEFAULT_CONFIG_FILE;
+ }
+
+ final File configFile = new File(configFilename);
+ if ( !configFile.exists() ) {
+ throw new FileNotFoundException(DEFAULT_CONFIG_FILE);
+ }
+
+ final Properties properties = new Properties();
+ try (final FileInputStream fis = new FileInputStream(configFile)) {
+ properties.load(fis);
+ }
+
+ final Map<String, String> props = new HashMap<>();
+ props.putAll( (Map) properties );
+
+ final String specifiedWorkingDir = props.get("working.dir");
+ if ( specifiedWorkingDir != null ) {
+ builder.directory(new File(specifiedWorkingDir));
+ }
+
+ final File workingDir = builder.directory();
+
+ final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim();
+ File libDir = getFile(libFilename, workingDir);
+
+ final String confFilename = replaceNull(props.get("conf.dir"), "./conf").trim();
+ File confDir = getFile(confFilename, workingDir);
+
+ String nifiPropsFilename = props.get("props.file");
+ if ( nifiPropsFilename == null ) {
+ if ( confDir.exists() ) {
+ nifiPropsFilename = new File(confDir, "nifi.properties").getAbsolutePath();
+ } else {
+ nifiPropsFilename = DEFAULT_CONFIG_FILE;
+ }
+ }
+
+ nifiPropsFilename = nifiPropsFilename.trim();
+
+ final List<String> javaAdditionalArgs = new ArrayList<>();
+ for ( final Map.Entry<String, String> entry : props.entrySet() ) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+
+ if ( key.startsWith("java.arg") ) {
+ javaAdditionalArgs.add(value);
+ }
+ }
+
+ final File[] libFiles = libDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String filename) {
+ return filename.toLowerCase().endsWith(".jar");
+ }
+ });
+
+ if ( libFiles == null || libFiles.length == 0 ) {
+ throw new RuntimeException("Could not find lib directory at " + libDir.getAbsolutePath());
+ }
+
+ final File[] confFiles = confDir.listFiles();
+ if ( confFiles == null || confFiles.length == 0 ) {
+ throw new RuntimeException("Could not find conf directory at " + confDir.getAbsolutePath());
+ }
+
+ final Path workingDirPath = workingDir.toPath();
+ final List<String> cpFiles = new ArrayList<>(confFiles.length + libFiles.length);
+ cpFiles.add(confDir.getAbsolutePath());
+ for ( final File file : libFiles ) {
+ final Path path = workingDirPath.relativize(file.toPath());
+ final String cpPath = path.toString();
+ cpFiles.add(cpPath);
+ }
+
+ final StringBuilder classPathBuilder = new StringBuilder();
+ for (int i=0; i < cpFiles.size(); i++) {
+ final String filename = cpFiles.get(i);
+ classPathBuilder.append(filename);
+ if ( i < cpFiles.size() - 1 ) {
+ classPathBuilder.append(File.pathSeparatorChar);
+ }
+ }
+
+ final String classPath = classPathBuilder.toString();
+ String javaCmd = props.get("java");
+ if ( javaCmd == null ) {
+ javaCmd = "java";
+ }
+
+ final List<String> cmd = new ArrayList<>();
+ cmd.add(javaCmd);
+ cmd.add("-classpath");
+ cmd.add(classPath);
+ cmd.addAll(javaAdditionalArgs);
+ cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
+ cmd.add("org.apache.nifi.NiFi");
+
+ builder.command(cmd).inheritIO();
+
+ final StringBuilder cmdBuilder = new StringBuilder();
+ for ( final String s : cmd ) {
+ cmdBuilder.append(s).append(" ");
+ }
+ System.out.println("Starting Apache NiFi...");
+ System.out.println("Working Directory: " + workingDir.getAbsolutePath());
+ System.out.println("Command: " + cmdBuilder.toString());
+
+ final Process proc = builder.start();
+ Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc));
+ final int statusCode = proc.waitFor();
+ System.out.println("Apache NiFi exited with Status Code " + statusCode);
+ }
+
+
+ private static File getFile(final String filename, final File workingDir) {
+ File libDir = new File(filename);
+ if ( !libDir.isAbsolute() ) {
+ libDir = new File(workingDir, filename);
+ }
+
+ return libDir;
+ }
+
+ private static String replaceNull(final String value, final String replacement) {
+ return (value == null) ? replacement : value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/567dfc79/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
new file mode 100644
index 0000000..55e1f45
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
@@ -0,0 +1,14 @@
+package org.apache.nifi.bootstrap;
+
+public class ShutdownHook extends Thread {
+ private final Process nifiProcess;
+
+ public ShutdownHook(final Process nifiProcess) {
+ this.nifiProcess = nifiProcess;
+ }
+
+ @Override
+ public void run() {
+ nifiProcess.destroy();
+ }
+}
[2/4] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb63c666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb63c666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb63c666
Branch: refs/heads/bootstrap
Commit: cb63c66602a398b0c14cc78732db5ca63e36a94c
Parents: 567dfc7 0125f3d
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 9 11:20:13 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Dec 9 11:20:13 2014 -0500
----------------------------------------------------------------------
nar-bundles/framework-bundle/framework/site-to-site/pom.xml | 4 ++++
nar-bundles/framework-bundle/pom.xml | 5 +++++
2 files changed, 9 insertions(+)
----------------------------------------------------------------------
[3/4] incubator-nifi git commit: NIFI-145
Posted by ma...@apache.org.
NIFI-145
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/64657049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/64657049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/64657049
Branch: refs/heads/bootstrap
Commit: 646570490c530d0c076c9bd9b7d1170946a9dae8
Parents: cb63c66
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 9 12:18:35 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Dec 9 12:18:35 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/BootstrapListener.java | 229 ++++++++++++++++
.../src/main/java/org/apache/nifi/NiFi.java | 25 ++
nifi-bootstrap/pom.xml | 17 +-
.../apache/nifi/bootstrap/BootstrapCodec.java | 89 ++++++
.../org/apache/nifi/bootstrap/NiFiListener.java | 116 ++++++++
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 270 +++++++++++++++++--
.../org/apache/nifi/bootstrap/ShutdownHook.java | 59 +++-
.../exception/InvalidCommandException.java | 37 +++
8 files changed, 824 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
new file mode 100644
index 0000000..3bcbeb3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BootstrapListener {
+ private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
+
+ private final NiFi nifi;
+ private final int bootstrapPort;
+
+ private Listener listener;
+ private ServerSocket serverSocket;
+
+
+ public BootstrapListener(final NiFi nifi, final int port) {
+ this.nifi = nifi;
+ this.bootstrapPort = port;
+ }
+
+ public void start() throws IOException {
+ logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
+
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress("localhost", 0));
+
+ final int localPort = serverSocket.getLocalPort();
+ logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
+
+ listener = new Listener(serverSocket);
+ final Thread listenThread = new Thread(listener);
+ listenThread.setName("Listen to Bootstrap");
+ listenThread.start();
+
+ logger.debug("Notifying Bootstrap that local port is {}", localPort);
+ try (final Socket socket = new Socket()) {
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", bootstrapPort));
+ socket.setSoTimeout(60000);
+
+ final OutputStream out = socket.getOutputStream();
+ out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ logger.debug("Awaiting response from Bootstrap...");
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ final String response = reader.readLine();
+ if ("OK".equals(response)) {
+ logger.info("Successfully initiated communication with Bootstrap");
+ } else {
+ logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
+ }
+ }
+ }
+
+
+ public void stop() {
+ if (listener != null) {
+ listener.stop();
+ }
+ }
+
+ private class Listener implements Runnable {
+ private final ServerSocket serverSocket;
+ private final ExecutorService executor;
+ private volatile boolean stopped = false;
+
+ public Listener(final ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ this.executor = Executors.newFixedThreadPool(2);
+ }
+
+ public void stop() {
+ stopped = true;
+
+ executor.shutdownNow();
+
+ try {
+ serverSocket.close();
+ } catch (final IOException ioe) {
+ // nothing to really do here. we could log this, but it would just become
+ // confusing in the logs, as we're shutting down and there's no real benefit
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!serverSocket.isClosed()) {
+ try {
+ if ( stopped ) {
+ return;
+ }
+
+ final Socket socket;
+ try {
+ socket = serverSocket.accept();
+ } catch (final IOException ioe) {
+ if ( stopped ) {
+ return;
+ }
+
+ throw ioe;
+ }
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final BootstrapRequest request = readRequest(socket.getInputStream());
+ final BootstrapRequest.RequestType requestType = request.getRequestType();
+
+ switch (requestType) {
+ case PING:
+ logger.debug("Received PING request from Bootstrap; responding");
+ echoPing(socket.getOutputStream());
+ logger.debug("Responded to PING request from Bootstrap");
+ break;
+ case SHUTDOWN:
+ logger.info("Received SHUTDOWN request from Bootstrap");
+ echoShutdown(socket.getOutputStream());
+ nifi.shutdownHook();
+ return;
+ }
+ } catch (final Throwable t) {
+ logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+ } finally {
+ try {
+ socket.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
+ }
+ }
+ }
+ });
+ } catch (final Throwable t) {
+ logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+ }
+ }
+ }
+ }
+
+
+ private void echoPing(final OutputStream out) throws IOException {
+ out.write("PING\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ }
+
+ private void echoShutdown(final OutputStream out) throws IOException {
+ out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ }
+
+ private BootstrapRequest readRequest(final InputStream in) throws IOException {
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+ final String line = reader.readLine();
+ final String[] splits = line.split(" ");
+ if ( splits.length < 0 ) {
+ throw new IOException("Received invalid command from NiFi: " + line);
+ }
+
+ final String requestType = splits[0];
+ final String[] args;
+ if ( splits.length == 1 ) {
+ args = new String[0];
+ } else {
+ args = Arrays.copyOfRange(splits, 1, splits.length);
+ }
+
+ try {
+ return new BootstrapRequest(requestType, args);
+ } catch (final Exception e) {
+ throw new IOException("Received invalid request from bootstrap; request type = " + requestType);
+ }
+ }
+
+
+ private static class BootstrapRequest {
+ public static enum RequestType {
+ SHUTDOWN,
+ PING;
+ }
+
+ private final RequestType requestType;
+ private final String[] args;
+
+ public BootstrapRequest(final String request, final String[] args) {
+ this.requestType = RequestType.valueOf(request);
+ this.args = args;
+ }
+
+ public RequestType getRequestType() {
+ return requestType;
+ }
+
+ public String[] getArgs() {
+ return args;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
index 5fd1a13..bf50a21 100644
--- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -45,6 +45,9 @@ public class NiFi {
private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
private final NiFiServer nifiServer;
+ private final BootstrapListener bootstrapListener;
+
+ public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@@ -65,6 +68,25 @@ public class NiFi {
}
}));
+ final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
+ if ( bootstrapPort != null ) {
+ try {
+ final int port = Integer.parseInt(bootstrapPort);
+
+ if (port < 1 || port > 65535) {
+ throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+ }
+
+ bootstrapListener = new BootstrapListener(this, port);
+ bootstrapListener.start();
+ } catch (final NumberFormatException nfe) {
+ throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+ }
+ } else {
+ logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
+ bootstrapListener = null;
+ }
+
// delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing
@@ -115,6 +137,9 @@ public class NiFi {
if (nifiServer != null) {
nifiServer.stop();
}
+ if (bootstrapListener != null) {
+ bootstrapListener.stop();
+ }
logger.info("Jetty web server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) {
logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/pom.xml b/nifi-bootstrap/pom.xml
index b620c84..a992018 100644
--- a/nifi-bootstrap/pom.xml
+++ b/nifi-bootstrap/pom.xml
@@ -1,5 +1,18 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
<modelVersion>4.0.0</modelVersion>
<parent>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
new file mode 100644
index 0000000..8138c02
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+
+import org.apache.nifi.bootstrap.exception.InvalidCommandException;
+
+public class BootstrapCodec {
+ private final RunNiFi runner;
+ private final BufferedReader reader;
+ private final BufferedWriter writer;
+
+ public BootstrapCodec(final RunNiFi runner, final InputStream in, final OutputStream out) {
+ this.runner = runner;
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.writer = new BufferedWriter(new OutputStreamWriter(out));
+ }
+
+ public void communicate() throws IOException {
+ final String line = reader.readLine();
+ final String[] splits = line.split(" ");
+ if ( splits.length < 0 ) {
+ throw new IOException("Received invalid command from NiFi: " + line);
+ }
+
+ final String cmd = splits[0];
+ final String[] args;
+ if ( splits.length == 1 ) {
+ args = new String[0];
+ } else {
+ args = Arrays.copyOfRange(splits, 1, splits.length);
+ }
+
+ try {
+ processRequest(cmd, args);
+ } catch (final InvalidCommandException ice) {
+ throw new IOException("Received invalid command from NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + ice.toString());
+ }
+ }
+
+ private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
+ switch (cmd) {
+ case "PORT": {
+ if ( args.length != 1 ) {
+ throw new InvalidCommandException();
+ }
+
+ final int port;
+ try {
+ port = Integer.parseInt( args[0] );
+ } catch (final NumberFormatException nfe) {
+ throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
+ }
+
+ if ( port < 1 || port > 65535 ) {
+ throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
+ }
+
+ runner.setNiFiCommandControlPort(port);
+ writer.write("OK");
+ writer.newLine();
+ writer.flush();
+ }
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
new file mode 100644
index 0000000..c831351
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class NiFiListener {
+ private ServerSocket serverSocket;
+ private volatile Listener listener;
+
+ int start(final RunNiFi runner) throws IOException {
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress("localhost", 0));
+
+ final int localPort = serverSocket.getLocalPort();
+ listener = new Listener(serverSocket, runner);
+ final Thread listenThread = new Thread(listener);
+ listenThread.setName("Listen to NiFi");
+ listenThread.start();
+ return localPort;
+ }
+
+ public void stop() throws IOException {
+ final Listener listener = this.listener;
+ if ( listener == null ) {
+ return;
+ }
+
+ listener.stop();
+ }
+
+ private class Listener implements Runnable {
+ private final ServerSocket serverSocket;
+ private final ExecutorService executor;
+ private final RunNiFi runner;
+ private volatile boolean stopped = false;
+
+ public Listener(final ServerSocket serverSocket, final RunNiFi runner) {
+ this.serverSocket = serverSocket;
+ this.executor = Executors.newFixedThreadPool(2);
+ this.runner = runner;
+ }
+
+ public void stop() throws IOException {
+ stopped = true;
+
+ executor.shutdown();
+ try {
+ executor.awaitTermination(3, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ }
+
+ serverSocket.close();
+ }
+
+ @Override
+ public void run() {
+ while (!serverSocket.isClosed()) {
+ try {
+ if ( stopped ) {
+ return;
+ }
+
+ final Socket socket;
+ try {
+ socket = serverSocket.accept();
+ } catch (final IOException ioe) {
+ if ( stopped ) {
+ return;
+ }
+
+ throw ioe;
+ }
+
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream());
+ codec.communicate();
+ socket.close();
+ } catch (final Throwable t) {
+ System.out.println("Failed to communicate with NiFi due to " + t);
+ t.printStackTrace();
+ }
+ }
+ });
+ } catch (final Throwable t) {
+ System.err.println("Failed to receive information from NiFi due to " + t);
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index afa1f47..ea3e566 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -1,16 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.bootstrap;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
@@ -18,7 +47,6 @@ import java.util.Properties;
*
* This class looks for the bootstrap.conf file by looking in the following places (in order):
* <ol>
- * <li>First argument to the program</li>
* <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li>
* <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li>
* <li>./conf/bootstrap.conf, where {@code .} represents the working directory.
@@ -29,12 +57,57 @@ import java.util.Properties;
public class RunNiFi {
public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
+
+ public static final int MAX_RESTART_ATTEMPTS = 5;
+ public static final int STARTUP_WAIT_SECONDS = 60;
+
+ public static final String SHUTDOWN_CMD = "SHUTDOWN";
+ public static final String PING_CMD = "PING";
+
+ private volatile boolean autoRestartNiFi = true;
+ private volatile int ccPort = -1;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition startupCondition = lock.newCondition();
+
+ private final File bootstrapConfigFile;
+
+ public RunNiFi(final File bootstrapConfigFile) {
+ this.bootstrapConfigFile = bootstrapConfigFile;
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage:");
+ System.out.println();
+ System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command>");
+ System.out.println();
+ System.out.println("Valid commands include:");
+ System.out.println("");
+ System.out.println("Start : Start a new instance of Apache NiFi");
+ System.out.println("Stop : Stop a running instance of Apache NiFi");
+ System.out.println("Status : Determine if there is a running instance of Apache NiFi");
+ System.out.println();
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(final String[] args) throws IOException, InterruptedException {
- final ProcessBuilder builder = new ProcessBuilder();
-
- String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file");
+ if ( args.length != 1 ) {
+ printUsage();
+ return;
+ }
+
+ switch (args[0].toLowerCase()) {
+ case "start":
+ case "stop":
+ case "status":
+ break;
+ default:
+ System.out.println("Invalid argument: " + args[0]);
+ System.out.println();
+ printUsage();
+ return;
+ }
+
+ String configFilename = System.getProperty("org.apache.nifi.boostrap.config.file");
if ( configFilename == null ) {
final String nifiHome = System.getenv("NIFI_HOME");
@@ -50,12 +123,122 @@ public class RunNiFi {
}
final File configFile = new File(configFilename);
- if ( !configFile.exists() ) {
+
+ final RunNiFi runNiFi = new RunNiFi(configFile);
+
+ switch (args[0].toLowerCase()) {
+ case "start":
+ runNiFi.start();
+ break;
+ case "stop":
+ runNiFi.stop();
+ break;
+ case "status":
+ runNiFi.status();
+ break;
+ }
+ }
+
+
+ public File getStatusFile() {
+ final File rootDir = bootstrapConfigFile.getParentFile();
+ final File statusFile = new File(rootDir, "nifi.port");
+ return statusFile;
+ }
+
+ private Integer getCurrentPort() throws IOException {
+ try {
+ final File statusFile = getStatusFile();
+ final byte[] info = Files.readAllBytes(statusFile.toPath());
+ final String text = new String(info);
+
+ final int port = Integer.parseInt(text);
+
+ try (final Socket socket = new Socket("localhost", port)) {
+ final OutputStream out = socket.getOutputStream();
+ out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ final InputStream in = socket.getInputStream();
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ final String response = reader.readLine();
+ if ( response.equals(PING_CMD) ) {
+ return port;
+ }
+ } catch (final IOException ioe) {
+ System.out.println("Found NiFi instance info at " + statusFile + " but information appears to be stale. Removing file.");
+ if ( !statusFile.delete() ) {
+ System.err.println("Unable to remove status file");
+ }
+
+ throw ioe;
+ }
+ } catch (final Exception e) {
+ return null;
+ }
+
+ return null;
+ }
+
+
+ public void status() throws IOException {
+ final Integer port = getCurrentPort();
+ if ( port == null ) {
+ System.out.println("Apache NiFi does not appear to be running");
+ } else {
+ System.out.println("Apache NiFi is currently running, listening on port " + port);
+ }
+ return;
+ }
+
+
+ public void stop() throws IOException {
+ final Integer port = getCurrentPort();
+ if ( port == null ) {
+ System.out.println("Apache NiFi is not currently running");
+ return;
+ }
+
+ try (final Socket socket = new Socket()) {
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", port));
+ socket.setSoTimeout(60000);
+
+ final OutputStream out = socket.getOutputStream();
+ out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ final InputStream in = socket.getInputStream();
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ final String response = reader.readLine();
+ if ( SHUTDOWN_CMD.equals(response) ) {
+ System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now");
+ } else {
+ System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
+ }
+ } catch (final IOException ioe) {
+ System.err.println("Failed to communicate with Apache NiFi");
+ return;
+ }
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void start() throws IOException, InterruptedException {
+ final Integer port = getCurrentPort();
+ if ( port != null ) {
+ System.out.println("Apache NiFi is already running, listening on port " + port);
+ return;
+ }
+
+ final ProcessBuilder builder = new ProcessBuilder();
+
+ if ( !bootstrapConfigFile.exists() ) {
throw new FileNotFoundException(DEFAULT_CONFIG_FILE);
}
final Properties properties = new Properties();
- try (final FileInputStream fis = new FileInputStream(configFile)) {
+ try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
properties.load(fis);
}
@@ -136,32 +319,67 @@ public class RunNiFi {
javaCmd = "java";
}
+ final NiFiListener listener = new NiFiListener();
+ final int listenPort = listener.start(this);
+
final List<String> cmd = new ArrayList<>();
cmd.add(javaCmd);
cmd.add("-classpath");
cmd.add(classPath);
cmd.addAll(javaAdditionalArgs);
cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
+ cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
cmd.add("org.apache.nifi.NiFi");
- builder.command(cmd).inheritIO();
+ builder.command(cmd);
final StringBuilder cmdBuilder = new StringBuilder();
for ( final String s : cmd ) {
cmdBuilder.append(s).append(" ");
}
+
System.out.println("Starting Apache NiFi...");
System.out.println("Working Directory: " + workingDir.getAbsolutePath());
System.out.println("Command: " + cmdBuilder.toString());
-
- final Process proc = builder.start();
- Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc));
- final int statusCode = proc.waitFor();
- System.out.println("Apache NiFi exited with Status Code " + statusCode);
+
+ builder.start();
+ boolean started = waitForStart();
+
+ if ( started ) {
+ System.out.println("Successfully started Apache NiFi");
+ } else {
+ System.err.println("Apache NiFi does not appear to have started");
+ }
+
+ listener.stop();
}
- private static File getFile(final String filename, final File workingDir) {
+ private boolean waitForStart() {
+ lock.lock();
+ try {
+ final long startTime = System.nanoTime();
+
+ while ( ccPort < 1 ) {
+ try {
+ startupCondition.await(1, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ return false;
+ }
+
+ final long waitNanos = System.nanoTime() - startTime;
+ final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+ if (waitSeconds > STARTUP_WAIT_SECONDS) {
+ return false;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return true;
+ }
+
+ private File getFile(final String filename, final File workingDir) {
File libDir = new File(filename);
if ( !libDir.isAbsolute() ) {
libDir = new File(workingDir, filename);
@@ -170,7 +388,29 @@ public class RunNiFi {
return libDir;
}
- private static String replaceNull(final String value, final String replacement) {
+ private String replaceNull(final String value, final String replacement) {
return (value == null) ? replacement : value;
}
+
+ void setAutoRestartNiFi(final boolean restart) {
+ this.autoRestartNiFi = restart;
+ }
+
+ void setNiFiCommandControlPort(final int port) {
+ this.ccPort = port;
+
+ final File statusFile = getStatusFile();
+ try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
+ fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8));
+ fos.getFD().sync();
+ } catch (final IOException ioe) {
+ System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe);
+ }
+
+ System.out.println("Apache NiFi now running and listening for requests on port " + port);
+ }
+
+ int getNiFiCommandControlPort() {
+ return this.ccPort;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
index 55e1f45..142d984 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
@@ -1,14 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.bootstrap;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
public class ShutdownHook extends Thread {
private final Process nifiProcess;
+ private final RunNiFi runner;
+
+ public static final int WAIT_SECONDS = 10;
- public ShutdownHook(final Process nifiProcess) {
+ public ShutdownHook(final Process nifiProcess, final RunNiFi runner) {
this.nifiProcess = nifiProcess;
+ this.runner = runner;
}
@Override
public void run() {
+ runner.setAutoRestartNiFi(false);
+ final int ccPort = runner.getNiFiCommandControlPort();
+ if ( ccPort > 0 ) {
+ System.out.println("Initiating Shutdown of NiFi...");
+
+ try {
+ final Socket socket = new Socket("localhost", ccPort);
+ final OutputStream out = socket.getOutputStream();
+ out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ socket.close();
+ } catch (final IOException ioe) {
+ System.out.println("Failed to Shutdown NiFi due to " + ioe);
+ }
+ }
+
+ try {
+ nifiProcess.waitFor(WAIT_SECONDS, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ }
+
+ if ( nifiProcess.isAlive() ) {
+ System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process.");
+ }
nifiProcess.destroy();
+
+ final File statusFile = runner.getStatusFile();
+ if ( !statusFile.delete() ) {
+ System.err.println("Failed to delete status file " + statusFile.getAbsolutePath());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
new file mode 100644
index 0000000..962aa1c
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.exception;
+
+public class InvalidCommandException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidCommandException() {
+ super();
+ }
+
+ public InvalidCommandException(final String message) {
+ super(message);
+ }
+
+ public InvalidCommandException(final Throwable t) {
+ super(t);
+ }
+
+ public InvalidCommandException(final String message, final Throwable t) {
+ super(message, t);
+ }
+}
[4/4] incubator-nifi git commit: NIFI-145: Added batch scripts and a
simple bootstrap.conf file and fixed ShutdownHook to be Java 7 compliant
Posted by ma...@apache.org.
NIFI-145: Added batch scripts and a simple bootstrap.conf file and fixed ShutdownHook to be Java 7 compliant
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eed4a9bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eed4a9bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eed4a9bb
Branch: refs/heads/bootstrap
Commit: eed4a9bb810ceb5a366009c8379b06ed80b833f3
Parents: 6465704
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 9 13:51:15 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Dec 9 13:51:15 2014 -0500
----------------------------------------------------------------------
.../src/main/resources/bin/nifi-status.bat | 15 +++++++++
.../src/main/resources/bin/start-nifi.bat | 15 +++++++++
.../src/main/resources/bin/stop-nifi.bat | 15 +++++++++
.../src/main/resources/conf/bootstrap.conf | 9 ++++++
.../org/apache/nifi/bootstrap/ShutdownHook.java | 33 +++++++++++++++-----
5 files changed, 79 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eed4a9bb/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
new file mode 100644
index 0000000..9b88349
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
@@ -0,0 +1,15 @@
+@echo off
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET LIB_DIR=%~dp0..\lib
+SET CONF_DIR=%~dp0..\conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=status
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eed4a9bb/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
new file mode 100644
index 0000000..c088672
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
@@ -0,0 +1,15 @@
+@echo off
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET LIB_DIR=%~dp0..\lib
+SET CONF_DIR=%~dp0..\conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=start
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eed4a9bb/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
new file mode 100644
index 0000000..753b09f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
@@ -0,0 +1,15 @@
+@echo off
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
+
+SET LIB_DIR=%~dp0..\lib
+SET CONF_DIR=%~dp0..\conf
+
+SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
+SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET BOOTSTRAP_ACTION=stop
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eed4a9bb/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
new file mode 100644
index 0000000..97d48f8
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
@@ -0,0 +1,9 @@
+lib.dir=./lib
+conf.dir=./conf
+java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
+
+java.arg.2=-Xms256m
+java.arg.3=-Xmx512m
+
+# Enable Remote Debugging
+#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eed4a9bb/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
index 142d984..f804c7c 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
@@ -53,19 +53,36 @@ public class ShutdownHook extends Thread {
}
}
- try {
- nifiProcess.waitFor(WAIT_SECONDS, TimeUnit.SECONDS);
- } catch (final InterruptedException ie) {
- }
-
- if ( nifiProcess.isAlive() ) {
- System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process.");
+ System.out.println("Waiting for Apache NiFi to finish shutting down...");
+ final long startWait = System.nanoTime();
+ while ( isAlive(nifiProcess) ) {
+ final long waitNanos = System.nanoTime() - startWait;
+ final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+ if ( waitSeconds >= WAIT_SECONDS ) {
+ if ( isAlive(nifiProcess) ) {
+ System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process.");
+ nifiProcess.destroy();
+ }
+ break;
+ } else {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {}
+ }
}
- nifiProcess.destroy();
final File statusFile = runner.getStatusFile();
if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath());
}
}
+
+ private boolean isAlive(final Process process) {
+ try {
+ process.exitValue();
+ return false;
+ } catch (final IllegalThreadStateException itse) {
+ return true;
+ }
+ }
}