You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/22 20:00:37 UTC
nifi git commit: NIFI-4424 Added functionality to allow NiFi to run
in "embedded" mode for eventual integration test access.
Repository: nifi
Updated Branches:
refs/heads/master 05e9e6eaa -> 3ebfcd5ae
NIFI-4424 Added functionality to allow NiFi to run in "embedded" mode for eventual integration test access.
This closes #2251.
Signed-off-by: Andy LoPresto <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3ebfcd5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3ebfcd5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3ebfcd5a
Branch: refs/heads/master
Commit: 3ebfcd5ae5132e29482d571e767142c806e30469
Parents: 05e9e6e
Author: Peter G. Horvath <pe...@gmail.com>
Authored: Sun Nov 5 12:25:15 2017 +0100
Committer: Andy LoPresto <al...@apache.org>
Committed: Mon Jan 22 12:00:33 2018 -0800
----------------------------------------------------------------------
.../org/apache/nifi/nar/NarClassLoaders.java | 31 ++++--
.../main/java/org/apache/nifi/EmbeddedNiFi.java | 57 +++++++++++
.../src/main/java/org/apache/nifi/NiFi.java | 99 +++++++++++++-------
3 files changed, 147 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3ebfcd5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
index 005a8fa..8921b25 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
@@ -100,7 +100,26 @@ public final class NarClassLoaders {
* @throws IllegalStateException already initialized with a given pair of
* directories cannot reinitialize or use a different pair of directories.
*/
- public void init(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
+ public void init(File frameworkWorkingDir, File extensionsWorkingDir) throws IOException, ClassNotFoundException {
+ init(ClassLoader.getSystemClassLoader(), frameworkWorkingDir, extensionsWorkingDir);
+ }
+
+ /**
+ * Initializes and loads the NarClassLoaders. This method must be called
+ * before the rest of the methods to access the classloaders are called and
+ * it can be safely called any number of times provided the same framework
+ * and extension working dirs are used.
+ *
+ * @param rootClassloader the root classloader to use for booting Jetty
+ * @param frameworkWorkingDir where to find framework artifacts
+ * @param extensionsWorkingDir where to find extension artifacts
+ * @throws java.io.IOException if any issue occurs while exploding nar working directories.
+ * @throws java.lang.ClassNotFoundException if unable to load class definition
+ * @throws IllegalStateException already initialized with a given pair of
+ * directories cannot reinitialize or use a different pair of directories.
+ */
+ public void init(final ClassLoader rootClassloader,
+ final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
if (frameworkWorkingDir == null || extensionsWorkingDir == null) {
throw new NullPointerException("cannot have empty arguments");
}
@@ -109,7 +128,7 @@ public final class NarClassLoaders {
synchronized (this) {
ic = initContext;
if (ic == null) {
- initContext = ic = load(frameworkWorkingDir, extensionsWorkingDir);
+ initContext = ic = load(rootClassloader, frameworkWorkingDir, extensionsWorkingDir);
}
}
}
@@ -123,9 +142,9 @@ public final class NarClassLoaders {
/**
* Should be called at most once.
*/
- private InitContext load(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
- // get the system classloader
- final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
+ private InitContext load(final ClassLoader rootClassloader,
+ final File frameworkWorkingDir, final File extensionsWorkingDir)
+ throws IOException, ClassNotFoundException {
// find all nar files and create class loaders for them.
final Map<String, Bundle> narDirectoryBundleLookup = new LinkedHashMap<>();
@@ -181,7 +200,7 @@ public final class NarClassLoaders {
// look for the jetty nar
if (JETTY_NAR_ID.equals(narDetail.getCoordinate().getId())) {
// create the jetty classloader
- jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), systemClassLoader);
+ jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), rootClassloader);
// remove the jetty nar since its already loaded
narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, jettyClassLoader));
http://git-wip-us.apache.org/repos/asf/nifi/blob/3ebfcd5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java
new file mode 100644
index 0000000..790947c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * <p>
+ * Starts an instance of NiFi within the <b>same JVM</b>, which can later properly be shut down.
+ * Intended to be used for testing purposes.</p>
+ *
+ */
+public class EmbeddedNiFi extends NiFi {
+
+ public EmbeddedNiFi(String[] args, ClassLoader rootClassLoader)
+ throws ClassNotFoundException, IOException, NoSuchMethodException,
+ InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+ super(convertArgumentsToValidatedNiFiProperties(args), rootClassLoader);
+ }
+
+ @Override
+ protected void initLogging() {
+ // do nothing when running in embedded mode
+ }
+
+ @Override
+ protected void setDefaultUncaughtExceptionHandler() {
+ // do nothing when running in embedded mode
+ }
+
+ @Override
+ protected void addShutdownHook() {
+ // do nothing when running in embedded mode
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3ebfcd5a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
index 58d17e4..68ee8c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -68,6 +68,13 @@ public class NiFi {
public NiFi(final NiFiProperties properties)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ this(properties, ClassLoader.getSystemClassLoader());
+
+ }
+
+ public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
+ throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
// There can only be one krb5.conf for the overall Java process so set this globally during
// start up so that processors and our Kerberos authentication code don't have to set this
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
@@ -77,22 +84,10 @@ public class NiFi {
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
}
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
- LOGGER.error("", e);
- }
- });
+ setDefaultUncaughtExceptionHandler();
// register the shutdown hook
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- // shutdown the jetty server
- shutdownHook();
- }
- }));
+ addShutdownHook();
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
@@ -125,8 +120,7 @@ public class NiFi {
detectTimingIssues();
// redirect JUL log events
- SLF4JBridgeHandler.removeHandlersForRootLogger();
- SLF4JBridgeHandler.install();
+ initLogging();
final Bundle systemBundle = SystemBundle.create(properties);
@@ -134,15 +128,18 @@ public class NiFi {
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// load the extensions classloaders
- NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
+ NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
+
+ narClassLoaders.init(rootClassLoader,
+ properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
- final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader();
+ final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
- final Set<Bundle> narBundles = NarClassLoaders.getInstance().getBundles();
+ final Set<Bundle> narBundles = narClassLoaders.getBundles();
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
@@ -169,6 +166,31 @@ public class NiFi {
}
}
+ protected void setDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
+ LOGGER.error("", e);
+ }
+ });
+ }
+
+ protected void addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // shutdown the jetty server
+ shutdownHook();
+ }
+ }));
+ }
+
+ protected void initLogging() {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ }
+
private static ClassLoader createBootstrapClassLoader() throws IOException {
//Get list of files in bootstrap folder
final List<URL> urls = new ArrayList<>();
@@ -185,21 +207,25 @@ public class NiFi {
protected void shutdownHook() {
try {
- this.shutdown = true;
-
- LOGGER.info("Initiating shutdown of Jetty web server...");
- if (nifiServer != null) {
- nifiServer.stop();
- }
- if (bootstrapListener != null) {
- bootstrapListener.stop();
- }
- LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
+ shutdown();
} catch (final Throwable t) {
LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
}
}
+ protected void shutdown() {
+ this.shutdown = true;
+
+ LOGGER.info("Initiating shutdown of Jetty web server...");
+ if (nifiServer != null) {
+ nifiServer.stop();
+ }
+ if (bootstrapListener != null) {
+ bootstrapListener.stop();
+ }
+ LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
+ }
+
/**
* Determine if the machine we're running on has timing issues.
*/
@@ -262,15 +288,20 @@ public class NiFi {
public static void main(String[] args) {
LOGGER.info("Launching NiFi...");
try {
- final ClassLoader bootstrap = createBootstrapClassLoader();
- NiFiProperties properties = initializeProperties(args, bootstrap);
- properties.validate();
+ NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
new NiFi(properties);
} catch (final Throwable t) {
LOGGER.error("Failure to launch NiFi due to " + t, t);
}
}
+ protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) throws IOException {
+ final ClassLoader bootstrap = createBootstrapClassLoader();
+ NiFiProperties properties = initializeProperties(args, bootstrap);
+ properties.validate();
+ return properties;
+ }
+
private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
// Try to get key
// If key doesn't exist, instantiate without
@@ -321,9 +352,9 @@ public class NiFi {
if (null == key) {
return "";
} else if (!isHexKeyValid(key)) {
- throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
+ throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
} else {
- return key;
+ return key;
}
}