You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/25 17:54:47 UTC
[1/3] nifi-minifi git commit: MINIFI-17 Adding error handling of
configurations that fail to start and a couple other small changes
Repository: nifi-minifi
Updated Branches:
refs/heads/master 0c04fbb61 -> 66dbda90c
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/default.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/default.yml b/minifi-bootstrap/src/test/resources/default.yml
new file mode 100644
index 0000000..064a746
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/default.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ name: MiNiFi Flow
+ comment:
+
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+
+Security Properties:
+ keystore:
+ keystore type:
+ keystore password:
+ key password:
+ truststore:
+ truststore type:
+ truststore password:
+ ssl protocol:
+ Sensitive Props:
+ key:
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+
+Processor Configuration:
+ name:
+ class:
+ max concurrent tasks:
+ scheduling strategy:
+ scheduling period:
+ penalization period:
+ yield period:
+ run duration nanos:
+ auto-terminated relationships list:
+ Properties:
+
+Connection Properties:
+ name:
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Group:
+ name:
+ comment:
+ url:
+ timeout:
+ yield period:
+ Input Port:
+ id:
+ name:
+ comments:
+ max concurrent tasks:
+ use compression:
+
+Provenance Reporting:
+ comment:
+ scheduling strategy:
+ scheduling period:
+ destination url:
+ port name:
+ originating url:
+ use compression:
+ timeout:
+ batch size:
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 107d9cc..a365c90 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -32,8 +32,8 @@ graceful.shutdown.seconds=20
nifi.minifi.config=./conf/config.yml
# Notifiers to use for the associated agent, comma separated list of class names
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.FileChangeNotifier
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier
+#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier
+#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier
# File change notifier configuration
[3/3] nifi-minifi git commit: MINIFI-17 Adding error handling of
configurations that fail to start and a couple other small changes
Posted by jp...@apache.org.
MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes
This closes #15
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/66dbda90
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/66dbda90
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/66dbda90
Branch: refs/heads/master
Commit: 66dbda90c904a679337633a3631ea09554887ec7
Parents: 0c04fbb
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Thu Apr 21 16:50:19 2016 -0400
Committer: Joseph Percivall <jo...@yahoo.com>
Committed: Mon Apr 25 11:53:32 2016 -0400
----------------------------------------------------------------------
minifi-assembly/pom.xml | 2 +-
.../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 270 +++++++----
.../nifi/minifi/bootstrap/ShutdownHook.java | 2 +
.../ConfigurationChangeException.java | 42 ++
.../ConfigurationChangeListener.java | 6 +-
.../ConfigurationChangeNotifier.java | 4 +-
.../configuration/FileChangeNotifier.java | 183 --------
.../configuration/ListenerHandleResult.java | 55 +++
.../configuration/RestChangeNotifier.java | 259 -----------
.../notifiers/FileChangeNotifier.java | 202 ++++++++
.../notifiers/RestChangeNotifier.java | 289 ++++++++++++
.../bootstrap/util/ConfigTransformer.java | 459 +++++++++++--------
.../configuration/TestFileChangeNotifier.java | 206 ---------
.../configuration/TestRestChangeNotifier.java | 51 ---
.../TestRestChangeNotifierSSL.java | 96 ----
.../notifiers/TestFileChangeNotifier.java | 208 +++++++++
.../notifiers/TestRestChangeNotifier.java | 51 +++
.../notifiers/TestRestChangeNotifierSSL.java | 96 ++++
.../notifiers/util/MockChangeListener.java | 51 +++
.../util/TestRestChangeNotifierCommon.java | 89 ++++
.../configuration/util/MockChangeListener.java | 46 --
.../util/TestRestChangeNotifierCommon.java | 89 ----
.../bootstrap/util/TestConfigTransformer.java | 27 ++
.../src/test/resources/config-empty.yml | 18 +
minifi-bootstrap/src/test/resources/default.yml | 101 ++++
.../src/main/resources/conf/bootstrap.conf | 4 +-
26 files changed, 1699 insertions(+), 1207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index 7a0e6b5..7c5266d 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -261,7 +261,7 @@ limitations under the License.
<!-- nifi.properties: web properties -->
<nifi.web.war.directory>./lib</nifi.web.war.directory>
<nifi.web.http.host />
- <nifi.web.http.port>8080</nifi.web.http.port>
+ <nifi.web.http.port>8081</nifi.web.http.port>
<nifi.web.https.host />
<nifi.web.https.port />
<nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 98d06f3..82b583f 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -51,10 +52,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
@@ -64,6 +67,8 @@ import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
/**
* <p>
* The class which bootstraps Apache MiNiFi. This class looks for the
@@ -116,7 +121,6 @@ public class RunMiNiFi {
private final Lock startedLock = new ReentrantLock();
private final Lock lock = new ReentrantLock();
private final Condition startupCondition = lock.newCondition();
-
private final File bootstrapConfigFile;
// used for logging initial info; these will be logged to console by default when the app is started
@@ -130,7 +134,10 @@ public class RunMiNiFi {
private volatile int gracefulShutdownSeconds;
private Set<ConfigurationChangeNotifier> changeNotifiers;
- private ConfigurationChangeListener changeListener;
+ private MiNiFiConfigurationChangeListener changeListener;
+
+ // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
+ private AtomicBoolean reloading = new AtomicBoolean(false);
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
@@ -273,12 +280,21 @@ public class RunMiNiFi {
final File confDir = bootstrapConfigFile.getParentFile();
final File nifiHome = confDir.getParentFile();
final File bin = new File(nifiHome, "bin");
- final File lockFile = new File(bin, "minifi.reload.lock");
+ final File reloadFile = new File(bin, "minifi.reload.lock");
- logger.debug("Reload File: {}", lockFile);
- return lockFile;
+ logger.debug("Reload File: {}", reloadFile);
+ return reloadFile;
+ }
+
+ public File getSwapFile(final Logger logger) {
+ final File confDir = bootstrapConfigFile.getParentFile();
+ final File swapFile = new File(confDir, "swap.yml");
+
+ logger.debug("Swap File: {}", swapFile);
+ return swapFile;
}
+
private Properties loadProperties(final Logger logger) throws IOException {
final Properties props = new Properties();
final File statusFile = getStatusFile(logger);
@@ -663,7 +679,8 @@ public class RunMiNiFi {
}
}
- logger.info("MiNiFi has finished shutting down.");
+ reloading.set(true);
+ logger.info("MiNiFi has finished shutting down and will be reloaded.");
}
} else {
logger.error("When sending RELOAD command to MiNiFi, got unexpected response {}", response);
@@ -1035,6 +1052,15 @@ public class RunMiNiFi {
@SuppressWarnings({"rawtypes", "unchecked"})
public void start() throws IOException, InterruptedException {
+ final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
+ final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
+ try {
+ performTransformation(new FileInputStream(configFile), confDir);
+ } catch (ConfigurationChangeException e) {
+ defaultLogger.error("The config file is malformed, unable to start.", e);
+ return;
+ }
+
Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
if (tuple == null) {
cmdLogger.info("Start method returned null, ending start command.");
@@ -1048,76 +1074,115 @@ public class RunMiNiFi {
ProcessBuilder builder = tuple.getKey();
Process process = tuple.getValue();
- while (true) {
- final boolean alive = isAlive(process);
+ try {
+ while (true) {
+ final boolean alive = isAlive(process);
- if (alive) {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
- } else {
- final Runtime runtime = Runtime.getRuntime();
- try {
- runtime.removeShutdownHook(shutdownHook);
- } catch (final IllegalStateException ise) {
- // happens when already shutting down
- }
+ if (alive) {
+ try {
+ Thread.sleep(1000L);
- String now = sdf.format(System.currentTimeMillis());
- if (autoRestartNiFi) {
- final File statusFile = getStatusFile(defaultLogger);
- if (!statusFile.exists()) {
- defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
- return;
- }
+ if (reloading.get() && getNifiStarted()) {
+ final File swapConfigFile = getSwapFile(defaultLogger);
+ if (swapConfigFile.exists()) {
+ defaultLogger.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration.");
- final File lockFile = getLockFile(defaultLogger);
- if (lockFile.exists()) {
- defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
- return;
- }
+ if (swapConfigFile.delete()) {
+ defaultLogger.info("Swap file was successfully deleted.");
+ } else {
+ defaultLogger.info("Swap file was not deleted.");
+ }
+ }
- final File reloadFile = getReloadFile(defaultLogger);
- if (reloadFile.exists()) {
- defaultLogger.info("Currently reloading configuration. Will not restart MiNiFi.");
- Thread.sleep(5000L);
- continue;
- }
+ reloading.set(false);
+ }
- final boolean previouslyStarted = getNifiStarted();
- if (!previouslyStarted) {
- defaultLogger.info("MiNiFi never started. Will not restart MiNiFi");
- return;
- } else {
- setNiFiStarted(false);
+ } catch (final InterruptedException ie) {
+ }
+ } else {
+ final Runtime runtime = Runtime.getRuntime();
+ try {
+ runtime.removeShutdownHook(shutdownHook);
+ } catch (final IllegalStateException ise) {
+ // happens when already shutting down
}
- process = builder.start();
- handleLogging(process);
+ if (autoRestartNiFi) {
+ final File statusFile = getStatusFile(defaultLogger);
+ if (!statusFile.exists()) {
+ defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
+ return;
+ }
- Long pid = getPid(process, defaultLogger);
- if (pid != null) {
- nifiPid = pid;
- final Properties nifiProps = new Properties();
- nifiProps.setProperty("pid", String.valueOf(nifiPid));
- saveProperties(nifiProps, defaultLogger);
- }
+ final File lockFile = getLockFile(defaultLogger);
+ if (lockFile.exists()) {
+ defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
+ return;
+ }
- shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
- runtime.addShutdownHook(shutdownHook);
+ final File reloadFile = getReloadFile(defaultLogger);
+ if (reloadFile.exists()) {
+ defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi.");
+ Thread.sleep(5000L);
+ continue;
+ }
+
+ final boolean previouslyStarted = getNifiStarted();
+ if (!previouslyStarted) {
+ final File swapConfigFile = getSwapFile(defaultLogger);
+ if (swapConfigFile.exists()) {
+ defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+ try {
+ performTransformation(new FileInputStream(swapConfigFile), confDir);
+ } catch (ConfigurationChangeException e) {
+ defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+ return;
+ }
- final boolean started = waitForStart();
+ Files.copy(swapConfigFile.toPath(), Paths.get(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+ defaultLogger.info("Replacing config file with swap file and deleting swap file");
+ if (!swapConfigFile.delete()) {
+ defaultLogger.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+ }
+ reloading.set(false);
+ } else {
+ defaultLogger.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi");
+ return;
+ }
+ } else {
+ setNiFiStarted(false);
+ }
- if (started) {
- defaultLogger.info("Successfully started Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+ process = builder.start();
+ handleLogging(process);
+
+ Long pid = getPid(process, defaultLogger);
+ if (pid != null) {
+ nifiPid = pid;
+ final Properties nifiProps = new Properties();
+ nifiProps.setProperty("pid", String.valueOf(nifiPid));
+ saveProperties(nifiProps, defaultLogger);
+ }
+
+ shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
+ runtime.addShutdownHook(shutdownHook);
+
+ final boolean started = waitForStart();
+
+ if (started) {
+ defaultLogger.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+ } else {
+ defaultLogger.error("Apache MiNiFi does not appear to have started");
+ }
} else {
- defaultLogger.error("Apache MiNiFi does not appear to have started");
+ return;
}
- } else {
- return;
}
}
+ } finally {
+ shutdownChangeNotifiers();
}
}
@@ -1255,7 +1320,7 @@ public class RunMiNiFi {
defaultLogger.warn("Apache MiNiFi has started but failed to persist MiNiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe});
}
- defaultLogger.info("Apache MiNiFi now running and listening for Bootstrap requests on port {}", port);
+ defaultLogger.info("The thread to run Apache MiNiFi is now running and listening for Bootstrap requests on port {}", port);
}
int getNiFiCommandControlPort() {
@@ -1328,7 +1393,7 @@ public class RunMiNiFi {
}
@Override
- public void handleChange(InputStream configInputStream) {
+ public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
logger.info("Received notification of a change");
try {
final Properties bootstrapProperties = runner.getBootstrapProperties();
@@ -1346,26 +1411,51 @@ public class RunMiNiFi {
final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
newConfigBais.mark(-1);
- logger.info("Persisting changes to {}", configFile.getAbsolutePath());
- saveFile(newConfigBais, configFile);
-
- // Reset the input stream to provide to the transformer
- newConfigBais.reset();
-
- final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- logger.info("Performing transformation for input and saving outputs to {}", configFile);
- performTransformation(newConfigBais, confDir);
-
- logger.info("Reloading instance with new configuration");
- restartInstance();
+ final File swapConfigFile = runner.getSwapFile(logger);
+ logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
+ Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
+ try {
+ logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+ saveFile(newConfigBais, configFile);
+
+ try {
+ // Reset the input stream to provide to the transformer
+ newConfigBais.reset();
+
+ final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+ logger.info("Performing transformation for input and saving outputs to {}", confDir);
+ performTransformation(newConfigBais, confDir);
+
+ logger.info("Reloading instance with new configuration");
+ restartInstance();
+ } catch (Exception e){
+ logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+ Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+ throw e;
+ }
+ } catch (Exception e){
+ logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+ if(!swapConfigFile.delete()){
+ logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+ }
+ throw e;
+ }
+ } catch (ConfigurationChangeException e){
+ logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
+ throw e;
} catch (IOException ioe) {
logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
- throw new IllegalStateException("Unable to perform reload of received configuration change", ioe);
+ throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
}
}
- private void saveFile(final InputStream configInputStream, File configFile) {
+ @Override
+ public String getDescriptor() {
+ return "MiNiFiConfigurationChangeListener";
+ }
+
+ private void saveFile(final InputStream configInputStream, File configFile) throws IOException {
try {
try (final FileOutputStream configFileOutputStream = new FileOutputStream(configFile)) {
byte[] copyArray = new byte[1024];
@@ -1375,29 +1465,31 @@ public class RunMiNiFi {
}
}
} catch (IOException ioe) {
- throw new IllegalStateException("Unable to save updated configuration to the configured config file location", ioe);
+ throw new IOException("Unable to save updated configuration to the configured config file location", ioe);
}
}
- private void performTransformation(InputStream configIs, String configDestinationPath) {
- try {
- ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
- } catch (Exception e) {
- logger.error("Unable to successfully transform the provided configuration", e);
- throw new IllegalStateException("Unable to successfully transform the provided configuration", e);
- }
- }
- private void restartInstance() {
- logger.info("Restarting MiNiFi with new configuration");
+
+ private void restartInstance() throws IOException {
try {
runner.reload();
} catch (IOException e) {
- throw new IllegalStateException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+ throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
}
}
}
+ private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+ try {
+ ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+ } catch (ConfigurationChangeException e){
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Unable to successfully transform the provided configuration", e);
+ }
+ }
+
private static class Status {
private final Integer port;
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index 13f0d16..ad3a2df 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -101,5 +101,7 @@ public class ShutdownHook extends Thread {
if (!statusFile.delete()) {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
+
+ System.out.println("MiNiFi is done shutting down");
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
new file mode 100644
index 0000000..04bbb02
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+/**
+ * Exception to indicate there was a problem handling a change to the configuration
+ */
+
+public class ConfigurationChangeException extends Exception {
+
+ public ConfigurationChangeException() {
+ super();
+ }
+
+ public ConfigurationChangeException(String message) {
+ super(message);
+ }
+
+ public ConfigurationChangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ConfigurationChangeException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 7d9183a..756b051 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -29,6 +29,10 @@ public interface ConfigurationChangeListener {
*
* @param is stream of the detected content received from the change notifier
*/
- void handleChange(InputStream is);
+ void handleChange(InputStream is) throws ConfigurationChangeException;
+ /**
+ * Returns a succinct string identifying this particular listener
+ */
+ String getDescriptor();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 7ad32f1..745ce6c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -17,6 +17,7 @@
package org.apache.nifi.minifi.bootstrap.configuration;
import java.io.Closeable;
+import java.util.Collection;
import java.util.Properties;
import java.util.Set;
@@ -52,6 +53,5 @@ public interface ConfigurationChangeNotifier extends Closeable {
/**
* Provide the mechanism by which listeners are notified
*/
- void notifyListeners();
-
+ Collection<ListenerHandleResult> notifyListeners();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
deleted file mode 100644
index d3f51f7..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}. Upon modifications to the associated file,
- * associated listeners receive notification of a change allowing configuration logic to be reanalyzed. The backing implementation is associated with a {@link ScheduledExecutorService} that
- * ensures continuity of monitoring.
- */
-public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
-
- private Path configFile;
- private WatchService watchService;
- private long pollingSeconds;
-
- private ScheduledExecutorService executorService;
- private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-
- protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
- protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
-
- protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
- protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
-
- @Override
- public Set<ConfigurationChangeListener> getChangeListeners() {
- return Collections.unmodifiableSet(configurationChangeListeners);
- }
-
- @Override
- public void notifyListeners() {
- final File fileToRead = configFile.toFile();
- for (final ConfigurationChangeListener listener : getChangeListeners()) {
- try (final FileInputStream fis = new FileInputStream(fileToRead);) {
- listener.handleChange(fis);
- } catch (IOException ex) {
- throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
- }
- }
- }
-
- @Override
- public boolean registerListener(ConfigurationChangeListener listener) {
- return this.configurationChangeListeners.add(listener);
- }
-
- protected boolean targetChanged() {
- boolean targetChanged = false;
-
- final WatchKey watchKey = this.watchService.poll();
-
- if (watchKey == null) {
- return targetChanged;
- }
-
- for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
- final WatchEvent.Kind<?> evtKind = watchEvt.kind();
-
- final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
- final Path changedFile = pathEvent.context();
-
- // determine target change by verifying if the changed file corresponds to the config file monitored for this path
- targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
- }
-
- // After completing inspection, reset for detection of subsequent change events
- boolean valid = watchKey.reset();
- if (!valid) {
- throw new IllegalStateException("Unable to reinitialize file system watcher.");
- }
-
- return targetChanged;
- }
-
- protected static WatchService initializeWatcher(Path filePath) {
- try {
- final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
- final Path watchDirectory = filePath.getParent();
- watchDirectory.register(fsWatcher, ENTRY_MODIFY);
-
- return fsWatcher;
- } catch (IOException ioe) {
- throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
- }
- }
-
- @Override
- public void run() {
- if (targetChanged()) {
- notifyListeners();
- }
- }
-
- @Override
- public void initialize(Properties properties) {
- final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
- final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
-
- if (rawPath == null || rawPath.isEmpty()) {
- throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
- }
-
- try {
- setConfigFile(Paths.get(rawPath));
- setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
- setWatchService(initializeWatcher(configFile));
- } catch (Exception e) {
- throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
- }
- }
-
- protected void setConfigFile(Path configFile) {
- this.configFile = configFile;
- }
-
- protected void setWatchService(WatchService watchService) {
- this.watchService = watchService;
- }
-
- protected void setPollingPeriod(long duration, TimeUnit unit) {
- if (duration < 0) {
- throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
- }
- this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
- }
-
- @Override
- public void start() {
- executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(final Runnable r) {
- final Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setName("File Change Notifier Thread");
- t.setDaemon(true);
- return t;
- }
- });
- this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
- }
-
- @Override
- public void close() {
- if (this.executorService != null) {
- this.executorService.shutdownNow();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
new file mode 100644
index 0000000..8ac4cea
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+public class ListenerHandleResult {
+
+ private final ConfigurationChangeListener configurationChangeListener;
+ private final Exception failureCause;
+
+ public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+ this.configurationChangeListener = configurationChangeListener;
+ failureCause = null;
+ }
+
+ public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+ this.configurationChangeListener = configurationChangeListener;
+ this.failureCause = failureCause;
+ }
+
+ public boolean succeeded(){
+ return failureCause == null;
+ }
+
+ public String getDescriptor(){
+ return configurationChangeListener.getDescriptor();
+ }
+
+ public Exception getFailureCause(){
+ return failureCause;
+ }
+
+ @Override
+ public String toString() {
+ if(failureCause == null){
+ return getDescriptor() + " successfully handled the configuration change";
+ } else {
+ return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
deleted file mode 100644
index 5807f89..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-
-public class RestChangeNotifier implements ConfigurationChangeNotifier {
-
- private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
- private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
- private String configFile = null;
- private final Server jetty;
- public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
- "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
- "Send a POST http request to '/' to upload the file.";
- public static final String POST_TEXT ="Configuration received, notifying listeners.\n";
- public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
-
-
- public static final String POST = "POST";
- public static final String GET = "GET";
-
- public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
- public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
- public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
- public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
- public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
- public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
- public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
- public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
- public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
-
- public RestChangeNotifier(){
- QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
- queuedThreadPool.setDaemon(true);
- jetty = new Server(queuedThreadPool);
- }
-
- @Override
- public void initialize(Properties properties) {
- logger.info("Initializing");
-
- // create the secure connector if keystore location is specified
- if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
- createSecureConnector(properties);
- } else {
- // create the unsecure connector otherwise
- createConnector(properties);
- }
-
- HandlerCollection handlerCollection = new HandlerCollection(true);
- handlerCollection.addHandler(new JettyHandler());
- jetty.setHandler(handlerCollection);
- }
-
-
- @Override
- public Set<ConfigurationChangeListener> getChangeListeners() {
- return configurationChangeListeners;
- }
-
- @Override
- public boolean registerListener(ConfigurationChangeListener listener) {
- return configurationChangeListeners.add(listener);
- }
-
- @Override
- public void notifyListeners() {
- if (configFile == null){
- throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
- }
-
- for (final ConfigurationChangeListener listener : getChangeListeners()) {
- try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes());) {
- listener.handleChange(fis);
- } catch (IOException ex) {
- throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
- }
- }
-
- configFile = null;
- }
-
- @Override
- public void start(){
- try {
- jetty.start();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-
- @Override
- public void close() throws IOException {
- logger.warn("Shutting down the jetty server");
- try {
- jetty.stop();
- jetty.destroy();
- } catch (Exception e) {
- throw new IOException(e);
- }
- logger.warn("Done shutting down the jetty server");
- }
-
- public URI getURI(){
- return jetty.getURI();
- }
-
- public int getPort(){
- if (!jetty.isStarted()) {
- throw new IllegalStateException("Jetty server not started");
- }
- return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
- }
-
- public String getConfigString(){
- return configFile;
- }
-
- private void setConfigFile(String configFile){
- this.configFile = configFile;
- }
-
- private void createConnector(Properties properties) {
- final ServerConnector http = new ServerConnector(jetty);
-
- http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
- http.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
- // Severely taxed or distant environments may have significant delays when executing.
- http.setIdleTimeout(30000L);
- jetty.addConnector(http);
-
- logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
- }
-
- private void createSecureConnector(Properties properties) {
- SslContextFactory ssl = new SslContextFactory();
-
- if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
- ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
- ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
- ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
- }
-
- if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
- ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
- ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
- ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
- ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
- }
-
- // build the connector
- final ServerConnector https = new ServerConnector(jetty, ssl);
-
- // set host and port
- https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
- https.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
- // Severely taxed environments may have significant delays when executing.
- https.setIdleTimeout(30000L);
-
- // add the connector
- jetty.addConnector(https);
-
- logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
- }
-
-
- public class JettyHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
-
- logRequest(request);
-
- baseRequest.setHandled(true);
-
- if(POST.equals(request.getMethod())) {
- final StringBuilder configBuilder = new StringBuilder();
- BufferedReader reader = request.getReader();
- if(reader != null && reader.ready()){
- String line;
- while ((line = reader.readLine()) != null) {
- configBuilder.append(line);
- configBuilder.append(System.getProperty("line.separator"));
- }
- }
- setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
- notifyListeners();
- writeOutput(response, POST_TEXT, 200);
- } else if(GET.equals(request.getMethod())) {
- writeOutput(response, GET_TEXT, 200);
- } else {
- writeOutput(response, OTHER_TEXT, 404);
- }
- }
-
- private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
- response.setStatus(responseCode);
- response.setContentType("text/plain");
- response.setContentLength(responseText.length());
- try (PrintWriter writer = response.getWriter()) {
- writer.print(responseText);
- writer.flush();
- }
- }
-
- private void logRequest(HttpServletRequest request){
- logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
- logger.info("request method = " + request.getMethod());
- logger.info("request url = " + request.getRequestURL());
- logger.info("context path = " + request.getContextPath());
- logger.info("request content type = " + request.getContentType());
- logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
new file mode 100644
index 0000000..faba2f0
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}. Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed. The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
+
+ private Path configFile;
+ private WatchService watchService;
+ private long pollingSeconds;
+
+ private final static Logger logger = LoggerFactory.getLogger(FileChangeNotifier.class);
+ private ScheduledExecutorService executorService;
+ private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+
+ protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
+ protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
+
+ protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+ @Override
+ public Set<ConfigurationChangeListener> getChangeListeners() {
+ return Collections.unmodifiableSet(configurationChangeListeners);
+ }
+
+ @Override
+ public Collection<ListenerHandleResult> notifyListeners() {
+ logger.info("Notifying Listeners of a change");
+ final File fileToRead = configFile.toFile();
+
+ Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+ for (final ConfigurationChangeListener listener : getChangeListeners()) {
+ ListenerHandleResult result;
+ try (final FileInputStream fis = new FileInputStream(fileToRead);) {
+ listener.handleChange(fis);
+ result = new ListenerHandleResult(listener);
+ } catch (IOException | ConfigurationChangeException ex) {
+ result = new ListenerHandleResult(listener, ex);
+ }
+ listenerHandleResults.add(result);
+ logger.info("Listener notification result:" + result.toString());
+ }
+ return listenerHandleResults;
+ }
+
+ @Override
+ public boolean registerListener(ConfigurationChangeListener listener) {
+ return this.configurationChangeListeners.add(listener);
+ }
+
+ protected boolean targetChanged() {
+ boolean targetChanged = false;
+
+ final WatchKey watchKey = this.watchService.poll();
+
+ if (watchKey == null) {
+ return targetChanged;
+ }
+
+ for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+ final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+ final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+ final Path changedFile = pathEvent.context();
+
+ // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+ targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
+ }
+
+ // After completing inspection, reset for detection of subsequent change events
+ boolean valid = watchKey.reset();
+ if (!valid) {
+ throw new IllegalStateException("Unable to reinitialize file system watcher.");
+ }
+
+ return targetChanged;
+ }
+
+ protected static WatchService initializeWatcher(Path filePath) {
+ try {
+ final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+ final Path watchDirectory = filePath.getParent();
+ watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+ return fsWatcher;
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+ }
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Checking for a change");
+ if (targetChanged()) {
+ notifyListeners();
+ }
+ }
+
+ @Override
+ public void initialize(Properties properties) {
+ final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+ final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+ if (rawPath == null || rawPath.isEmpty()) {
+ throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+ }
+
+ try {
+ setConfigFile(Paths.get(rawPath));
+ setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+ setWatchService(initializeWatcher(configFile));
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+ }
+ }
+
+ protected void setConfigFile(Path configFile) {
+ this.configFile = configFile;
+ }
+
+ protected void setWatchService(WatchService watchService) {
+ this.watchService = watchService;
+ }
+
+ protected void setPollingPeriod(long duration, TimeUnit unit) {
+ if (duration < 0) {
+ throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+ }
+ this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+ }
+
+ @Override
+ public void start() {
+ executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("File Change Notifier Thread");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+ }
+
+ @Override
+ public void close() {
+ if (this.executorService != null) {
+ this.executorService.shutdownNow();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
new file mode 100644
index 0000000..777214f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+
+public class RestChangeNotifier implements ConfigurationChangeNotifier {
+
+ private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+ private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
+ private String configFile = null;
+ private final Server jetty;
+ public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
+ "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
+ "Send a POST http request to '/' to upload the file.";
+ public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
+
+
+ public static final String POST = "POST";
+ public static final String GET = "GET";
+
+ public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
+ public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
+ public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
+ public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
+ public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
+ public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
+ public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
+ public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
+ public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
+
+ public RestChangeNotifier(){
+ QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+ queuedThreadPool.setDaemon(true);
+ jetty = new Server(queuedThreadPool);
+ }
+
+ @Override
+ public void initialize(Properties properties) {
+ logger.info("Initializing");
+
+ // create the secure connector if keystore location is specified
+ if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+ createSecureConnector(properties);
+ } else {
+ // create the unsecure connector otherwise
+ createConnector(properties);
+ }
+
+ HandlerCollection handlerCollection = new HandlerCollection(true);
+ handlerCollection.addHandler(new JettyHandler());
+ jetty.setHandler(handlerCollection);
+ }
+
+ @Override
+ public Set<ConfigurationChangeListener> getChangeListeners() {
+ return configurationChangeListeners;
+ }
+
+ @Override
+ public boolean registerListener(ConfigurationChangeListener listener) {
+ return configurationChangeListeners.add(listener);
+ }
+
+ @Override
+ public Collection<ListenerHandleResult> notifyListeners() {
+ if (configFile == null){
+ throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
+ }
+
+ Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+ for (final ConfigurationChangeListener listener : getChangeListeners()) {
+ ListenerHandleResult result;
+ try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes())) {
+ listener.handleChange(fis);
+ result = new ListenerHandleResult(listener);
+ } catch (IOException | ConfigurationChangeException ex) {
+ result = new ListenerHandleResult(listener, ex);
+ }
+ listenerHandleResults.add(result);
+ logger.info("Listener notification result:" + result.toString());
+ }
+
+ configFile = null;
+ return listenerHandleResults;
+ }
+
+ @Override
+ public void start(){
+ try {
+ jetty.start();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ logger.warn("Shutting down the jetty server");
+ try {
+ jetty.stop();
+ jetty.destroy();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ logger.warn("Done shutting down the jetty server");
+ }
+
+ public URI getURI(){
+ return jetty.getURI();
+ }
+
+ public int getPort(){
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+ }
+
+ public String getConfigString(){
+ return configFile;
+ }
+
+ private void setConfigFile(String configFile){
+ this.configFile = configFile;
+ }
+
+ private void createConnector(Properties properties) {
+ final ServerConnector http = new ServerConnector(jetty);
+
+ http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
+ http.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+ // Severely taxed or distant environments may have significant delays when executing.
+ http.setIdleTimeout(30000L);
+ jetty.addConnector(http);
+
+ logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
+ }
+
+ private void createSecureConnector(Properties properties) {
+ SslContextFactory ssl = new SslContextFactory();
+
+ if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+ ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
+ ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
+ ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
+ }
+
+ if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
+ ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
+ ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
+ ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
+ ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
+ }
+
+ // build the connector
+ final ServerConnector https = new ServerConnector(jetty, ssl);
+
+ // set host and port
+ https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
+ https.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+ // Severely taxed environments may have significant delays when executing.
+ https.setIdleTimeout(30000L);
+
+ // add the connector
+ jetty.addConnector(https);
+
+ logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
+ }
+
+
+ public class JettyHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ logRequest(request);
+
+ baseRequest.setHandled(true);
+
+ if(POST.equals(request.getMethod())) {
+ final StringBuilder configBuilder = new StringBuilder();
+ BufferedReader reader = request.getReader();
+ if(reader != null && reader.ready()){
+ String line;
+ while ((line = reader.readLine()) != null) {
+ configBuilder.append(line);
+ configBuilder.append(System.getProperty("line.separator"));
+ }
+ }
+ setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
+ Collection<ListenerHandleResult> listenerHandleResults = notifyListeners();
+
+ int statusCode = 200;
+ for (ListenerHandleResult result: listenerHandleResults){
+ if(!result.succeeded()){
+ statusCode = 500;
+ break;
+ }
+ }
+
+ writeOutput(response, getPostText(listenerHandleResults), statusCode);
+ } else if(GET.equals(request.getMethod())) {
+ writeOutput(response, GET_TEXT, 200);
+ } else {
+ writeOutput(response, OTHER_TEXT, 404);
+ }
+ }
+
+ private String getPostText(Collection<ListenerHandleResult> listenerHandleResults){
+ StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n");
+
+ for (ListenerHandleResult result : listenerHandleResults) {
+ postResult.append(result.toString());
+ postResult.append("\n");
+ }
+
+ return postResult.toString();
+ }
+
+ private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
+ response.setStatus(responseCode);
+ response.setContentType("text/plain");
+ response.setContentLength(responseText.length());
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(responseText);
+ writer.flush();
+ }
+ }
+
+ private void logRequest(HttpServletRequest request){
+ logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+ logger.info("request method = " + request.getMethod());
+ logger.info("request url = " + request.getRequestURL());
+ logger.info("context path = " + request.getContextPath());
+ logger.info("request content type = " + request.getContentType());
+ logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
+ }
+
+ }
+}
[2/3] nifi-minifi git commit: MINIFI-17 Adding error handling of
configurations that fail to start and a couple other small changes
Posted by jp...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 8bb25c8..633cce2 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -19,6 +19,8 @@ package org.apache.nifi.minifi.bootstrap.util;
import org.apache.nifi.controller.FlowSerializationException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -37,6 +39,8 @@ import javax.xml.transform.stream.StreamResult;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
@@ -151,11 +155,19 @@ public final class ConfigTransformer {
// Verify the parsed object is a Map structure
if (loadedObject instanceof Map) {
final Map<String, Object> result = (Map<String, Object>) loadedObject;
+
+ // Create nifi.properties and flow.xml.gz in memory
+ ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
+ writeNiFiProperties(result, nifiPropertiesOutputStream);
+
+ DOMSource flowXml = createFlowXml(result);
+
// Write nifi.properties and flow.xml.gz
- writeNiFiProperties(result, destPath);
- writeFlowXml(result, destPath);
+ writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
+
+ writeFlowXmlFile(flowXml, destPath);
} else {
- throw new IllegalArgumentException("Provided YAML configuration is malformed.");
+ throw new IllegalArgumentException("Provided YAML configuration is not a Map.");
}
} finally {
if (sourceStream != null) {
@@ -164,20 +176,49 @@ public final class ConfigTransformer {
}
}
- private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException {
+ private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
+ try {
+ final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
+ FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString()));
+ nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer());
+ } finally {
+ if (nifiPropertiesOutputStream != null){
+ nifiPropertiesOutputStream.flush();
+ nifiPropertiesOutputStream.close();
+ }
+ }
+ }
+
+ private static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException {
+
+ final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
+ final OutputStream outStream = new GZIPOutputStream(fileOut);
+ final StreamResult streamResult = new StreamResult(outStream);
+
+ // configure the transformer and convert the DOM
+ final TransformerFactory transformFactory = TransformerFactory.newInstance();
+ final Transformer transformer = transformFactory.newTransformer();
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+ // transform the document to byte stream
+ transformer.transform(domSource, streamResult);
+ outStream.flush();
+ outStream.close();
+ }
+
+ private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException {
PrintWriter writer = null;
try {
- final Path nifiPropertiesPath = Paths.get(path, "nifi.properties");
- writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8");
+ writer = new PrintWriter(outputStream, true);
- Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
- Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
+ Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
+ Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY);
- Map<String,Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
- Map<String,Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
- Map<String,Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- Map<String,Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
-
+ Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
+ Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
+ Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+ Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE);
writer.println("# Core Properties #");
@@ -284,6 +325,8 @@ public final class ConfigTransformer {
writer.println();
writer.println("# cluster manager properties (only configure for cluster manager) #");
writer.println("nifi.cluster.is.manager=false");
+ } catch (NullPointerException e) {
+ throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e);
} finally {
if (writer != null){
writer.flush();
@@ -291,7 +334,7 @@ public final class ConfigTransformer {
}
}
}
- private static void writeFlowXml(Map<String, Object> topLevelYaml, String path) throws Exception {
+ private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException {
try {
// create a new, empty document
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -304,39 +347,32 @@ public final class ConfigTransformer {
final Element rootNode = doc.createElement("flowController");
doc.appendChild(rootNode);
Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
+ addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
addProcessGroup(rootNode, topLevelYaml, "rootGroup");
Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
- if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
- final Element controllerServicesNode = doc.createElement("controllerServices");
- rootNode.appendChild(controllerServicesNode);
- addSSLControllerService(controllerServicesNode, securityProps);
+ if (securityProps != null) {
+ String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+ if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+ final Element controllerServicesNode = doc.createElement("controllerServices");
+ rootNode.appendChild(controllerServicesNode);
+ addSSLControllerService(controllerServicesNode, securityProps);
+ }
+ }
+
+ Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+ if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) {
+ final Element reportingTasksNode = doc.createElement("reportingTasks");
+ rootNode.appendChild(reportingTasksNode);
+ addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
}
- final Element reportingTasksNode = doc.createElement("reportingTasks");
- rootNode.appendChild(reportingTasksNode);
- addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
-
- final DOMSource domSource = new DOMSource(doc);
- final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
- final OutputStream outStream = new GZIPOutputStream(fileOut);
- final StreamResult streamResult = new StreamResult(outStream);
-
- // configure the transformer and convert the DOM
- final TransformerFactory transformFactory = TransformerFactory.newInstance();
- final Transformer transformer = transformFactory.newTransformer();
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
- // transform the document to byte stream
- transformer.transform(domSource, streamResult);
- outStream.flush();
- outStream.close();
- } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
+ return new DOMSource(doc);
+ } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
throw new FlowSerializationException(e);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e);
}
}
@@ -345,109 +381,140 @@ public final class ConfigTransformer {
return value == null ? "" : value.toString();
}
- private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) {
- final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
- addTextElement(serviceElement, "id", "SSL-Context-Service");
- addTextElement(serviceElement, "name", "SSL-Context-Service");
- addTextElement(serviceElement, "comment", "");
- addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
-
- addTextElement(serviceElement, "enabled", "true");
-
- Map<String, Object> attributes = new HashMap<>();
- attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
- attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
- attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
- attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
- attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
- attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
- attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
-
- addConfiguration(serviceElement, attributes);
+ private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){
+ Object value = null;
+ if (map != null){
+ value = map.get(key);
+ }
+ return value == null ? theDefault : value.toString();
+ }
- element.appendChild(serviceElement);
+ private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException {
+ try {
+ final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+ addTextElement(serviceElement, "id", "SSL-Context-Service");
+ addTextElement(serviceElement, "name", "SSL-Context-Service");
+ addTextElement(serviceElement, "comment", "");
+ addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
+
+ addTextElement(serviceElement, "enabled", "true");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
+ attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
+ attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
+ attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
+ attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
+ attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
+ attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
+
+ addConfiguration(serviceElement, attributes);
+
+ element.appendChild(serviceElement);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e);
+ }
}
- private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) {
- Map<String,Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
+ private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", "Root-Group");
- addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY) );
- addPosition(element);
- addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement(elementName);
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Root-Group");
+ addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
- Map<String,Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- addProcessor(element, processorConfig);
+ Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
+ addProcessor(element, processorConfig);
- Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
- addRemoteProcessGroup(element, remoteProcessingGroup);
+ Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+ addRemoteProcessGroup(element, remoteProcessingGroup);
- addConnection(element, topLevelYaml);
+ addConnection(element, topLevelYaml);
+ } catch (ConfigurationChangeException e){
+ throw e;
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e);
+ }
}
- private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) {
+ private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("processor");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Processor");
- addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
-
- addPosition(element);
- addStyle(element);
-
- addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
- addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
- addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
- addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
- addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
- addTextElement(element, "bulletinLevel", "WARN");
- addTextElement(element, "lossTolerant", "false");
- addTextElement(element, "scheduledState", "RUNNING");
- addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
- addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
-
- addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
-
- Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
- if (autoTerminatedRelationships != null) {
- for (String rel : autoTerminatedRelationships) {
- addTextElement(element, "autoTerminatedRelationship", rel);
+ try {
+ if (processorConfig.get(CLASS_KEY) == null) {
+ // Only add a processor if it has a class
+ return;
+ }
+
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("processor");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Processor");
+ addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
+
+ addPosition(element);
+ addStyle(element);
+
+ addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
+ addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
+ addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
+ addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
+ addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
+ addTextElement(element, "bulletinLevel", "WARN");
+ addTextElement(element, "lossTolerant", "false");
+ addTextElement(element, "scheduledState", "RUNNING");
+ addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
+ addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
+
+ addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
+
+ Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
+ if (autoTerminatedRelationships != null) {
+ for (String rel : autoTerminatedRelationships) {
+ addTextElement(element, "autoTerminatedRelationship", rel);
+ }
}
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e);
}
}
- private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) {
- Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
- final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
- addTextElement(taskElement, "id", "Provenance-Reporting");
- addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
- addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
- addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
- addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
- addTextElement(taskElement, "scheduledState", "RUNNING");
- addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
-
- Map<String, Object> attributes = new HashMap<>();
- attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
- attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
- attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
- attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
- attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
-
- Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
- if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
- attributes.put("SSL Context Service", "SSL-Context-Service");
- }
+ private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+ final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+ addTextElement(taskElement, "id", "Provenance-Reporting");
+ addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
+ addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
+ addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
+ addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
+ addTextElement(taskElement, "scheduledState", "RUNNING");
+ addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
+ attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
+ attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
+ attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
+ attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
+
+ Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+ String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+ if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+ attributes.put("SSL Context Service", "SSL-Context-Service");
+ }
- addConfiguration(taskElement, attributes);
+ addConfiguration(taskElement, attributes);
- element.appendChild(taskElement);
+ element.appendChild(taskElement);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
+ }
}
private static void addConfiguration(final Element element, Map<String, Object> elementConfig) {
@@ -472,75 +539,103 @@ public final class ConfigTransformer {
parentElement.appendChild(element);
}
- private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) {
-
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("remoteProcessGroup");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Remote-Process-Group");
- addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
- addPosition(element);
- addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
- addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
- addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
- addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
- addTextElement(element, "transmitting", "true");
-
- Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
- addRemoteGroupPort(element, inputPort, "inputPort");
+ private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException {
+ try {
+ if (remoteProcessingGroup.get(URL_KEY) == null) {
+ // Only add an an RPG if it has a URL
+ return;
+ }
- parentElement.appendChild(element);
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("remoteProcessGroup");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Remote-Process-Group");
+ addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
+ addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
+ addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
+ addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
+ addTextElement(element, "transmitting", "true");
+
+ Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+ addRemoteGroupPort(element, inputPort, "inputPort");
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e);
+ }
}
- private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", getValueString(inputPort, ID_KEY));
- addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
- addPosition(element);
- addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
- addTextElement(element, "scheduledState", "RUNNING");
- addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+ private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException {
- parentElement.appendChild(element);
+ try {
+ if (inputPort.get(ID_KEY) == null) {
+ // Only add an input port if it has an ID
+ return;
+ }
+
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement(elementName);
+ parentElement.appendChild(element);
+ addTextElement(element, "id", getValueString(inputPort, ID_KEY));
+ addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
+ addTextElement(element, "scheduledState", "RUNNING");
+ addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e);
+ }
}
- private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) {
- Map<String,Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
- Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
- Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("connection");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Connection");
- addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
+ private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
+ Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+ Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+ Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- final Element bendPointsElement = doc.createElement("bendPoints");
- element.appendChild(bendPointsElement);
+ if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) {
+ // Only add the connection if the input port and processor config are created
+ return;
+ }
- addTextElement(element, "labelIndex", "1");
- addTextElement(element, "zIndex", "0");
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("connection");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Connection");
+ addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
- addTextElement(element, "sourceId", "Processor");
- addTextElement(element, "sourceGroupId", "Root-Group");
- addTextElement(element, "sourceType", "PROCESSOR");
+ final Element bendPointsElement = doc.createElement("bendPoints");
+ element.appendChild(bendPointsElement);
- addTextElement(element, "destinationId", getValueString(inputPort,ID_KEY));
- addTextElement(element, "destinationGroupId", "Remote-Process-Group");
- addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+ addTextElement(element, "labelIndex", "1");
+ addTextElement(element, "zIndex", "0");
- addTextElement(element, "relationship", "success");
+ addTextElement(element, "sourceId", "Processor");
+ addTextElement(element, "sourceGroupId", "Root-Group");
+ addTextElement(element, "sourceType", "PROCESSOR");
- addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
- addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
+ addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY));
+ addTextElement(element, "destinationGroupId", "Remote-Process-Group");
+ addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
- addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
- addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+ addTextElement(element, "relationship", "success");
+ addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
+ addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
- parentElement.appendChild(element);
+ addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
+ addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e);
+ }
}
private static void addPosition(final Element parentElement) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
deleted file mode 100644
index 9432a2f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeNotifier {
-
- private static final String CONFIG_FILENAME = "config.yml";
- private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
- private FileChangeNotifier notifierSpy;
- private WatchService mockWatchService;
- private Properties testProperties;
-
- @Before
- public void setUp() throws Exception {
- mockWatchService = Mockito.mock(WatchService.class);
- notifierSpy = Mockito.spy(new FileChangeNotifier());
- notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
- notifierSpy.setWatchService(mockWatchService);
-
- testProperties = new Properties();
- testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
- testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
- }
-
- @After
- public void tearDown() throws Exception {
- notifierSpy.close();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidFile() throws Exception {
- testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
- notifierSpy.initialize(testProperties);
- }
-
- @Test
- public void testInitialize_validFile() throws Exception {
- notifierSpy.initialize(testProperties);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidPollingPeriod() throws Exception {
- testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
- notifierSpy.initialize(testProperties);
- }
-
- @Test
- public void testInitialize_useDefaultPolling() throws Exception {
- testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
- notifierSpy.initialize(testProperties);
- }
-
-
- @Test
- public void testNotifyListeners() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(testListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- notifierSpy.notifyListeners();
-
- verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testRegisterListener() throws Exception {
- final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
- wasRegistered = notifierSpy.registerListener(secondListener);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
-
- }
-
- @Test
- public void testRegisterDuplicateListener() throws Exception {
- final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
- Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
- }
-
- /* Verify handleChange events */
- @Test
- public void testTargetChangedNoModification() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- // In this case the WatchKey is null because there were no events found
- establishMockEnvironmentForChangeTests(testListener, null);
-
- verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
- final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
- establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
- notifierSpy.targetChanged();
-
- verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
- // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
- establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
- // Invoke the method of interest
- notifierSpy.run();
-
- verify(mockWatchService, Mockito.atLeastOnce()).poll();
- verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
- }
-
- /* Helper methods to establish mock environment */
- private WatchKey createMockWatchKeyForPath(String configFilePath) {
- final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
- final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
- when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
- when(mockWatchKey.reset()).thenReturn(true);
-
- final Iterator mockIterator = Mockito.mock(Iterator.class);
- when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
- final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
- when(mockIterator.hasNext()).thenReturn(true, false);
- when(mockIterator.next()).thenReturn(mockWatchEvent);
-
- // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
- when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
- when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
- return mockWatchKey;
- }
-
- private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
- final boolean wasRegistered = notifierSpy.registerListener(listener);
-
- // Establish the file mock and its parent directory
- final Path mockConfigFilePath = Mockito.mock(Path.class);
- final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
- // When getting the parent of the file, get the directory
- when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- when(mockWatchService.poll()).thenReturn(watchKey);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
deleted file mode 100644
index 75b44e3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
-
- @BeforeClass
- public static void setUp() throws InterruptedException, MalformedURLException {
- Properties properties = new Properties();
- restChangeNotifier = new RestChangeNotifier();
- restChangeNotifier.initialize(properties);
- restChangeNotifier.registerListener(mockChangeListener);
- restChangeNotifier.start();
-
- client = new OkHttpClient();
-
- url = restChangeNotifier.getURI().toURL().toString();
- Thread.sleep(1000);
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeNotifier.close();
- client = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
deleted file mode 100644
index 908e693..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
-
-
- @BeforeClass
- public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
- Properties properties = new Properties();
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
- properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
- restChangeNotifier = new RestChangeNotifier();
- restChangeNotifier.initialize(properties);
- restChangeNotifier.registerListener(mockChangeListener);
- restChangeNotifier.start();
-
- client = new OkHttpClient();
-
- SSLContext sslContext = SSLContext.getInstance("TLS");
- TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
-
- KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
-
- sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
- client.setSslSocketFactory(sslContext.getSocketFactory());
-
- url = restChangeNotifier.getURI().toURL().toString();
- Thread.sleep(1000);
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeNotifier.close();
- client = null;
- }
-
- private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
- KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-
- char[] password = "localtest".toCharArray();
-
- java.io.FileInputStream fis = null;
- try {
- fis = new java.io.FileInputStream(path);
- ks.load(fis, password);
- } finally {
- if (fis != null) {
- fis.close();
- }
- }
- return ks;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
new file mode 100644
index 0000000..145c2fe
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeNotifier {
+
+ private static final String CONFIG_FILENAME = "config.yml";
+ private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+ private FileChangeNotifier notifierSpy;
+ private WatchService mockWatchService;
+ private Properties testProperties;
+
+ @Before
+ public void setUp() throws Exception {
+ mockWatchService = Mockito.mock(WatchService.class);
+ notifierSpy = Mockito.spy(new FileChangeNotifier());
+ notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
+ notifierSpy.setWatchService(mockWatchService);
+
+ testProperties = new Properties();
+ testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+ testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ notifierSpy.close();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidFile() throws Exception {
+ testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test
+ public void testInitialize_validFile() throws Exception {
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidPollingPeriod() throws Exception {
+ testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test
+ public void testInitialize_useDefaultPolling() throws Exception {
+ testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
+ notifierSpy.initialize(testProperties);
+ }
+
+
+ @Test
+ public void testNotifyListeners() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(testListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ notifierSpy.notifyListeners();
+
+ verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testRegisterListener() throws Exception {
+ final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+ wasRegistered = notifierSpy.registerListener(secondListener);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
+
+ }
+
+ @Test
+ public void testRegisterDuplicateListener() throws Exception {
+ final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+ Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
+ }
+
+ /* Verify handleChange events */
+ @Test
+ public void testTargetChangedNoModification() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ // In this case the WatchKey is null because there were no events found
+ establishMockEnvironmentForChangeTests(testListener, null);
+
+ verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+ final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+ establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+ notifierSpy.targetChanged();
+
+ verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+ // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+ establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+ // Invoke the method of interest
+ notifierSpy.run();
+
+ verify(mockWatchService, Mockito.atLeastOnce()).poll();
+ verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ /* Helper methods to establish mock environment */
+ private WatchKey createMockWatchKeyForPath(String configFilePath) {
+ final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+ final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+ when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+ when(mockWatchKey.reset()).thenReturn(true);
+
+ final Iterator mockIterator = Mockito.mock(Iterator.class);
+ when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+ final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+ // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+ when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+ when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+ return mockWatchKey;
+ }
+
+ private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
+ final boolean wasRegistered = notifierSpy.registerListener(listener);
+
+ // Establish the file mock and its parent directory
+ final Path mockConfigFilePath = Mockito.mock(Path.class);
+ final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+ // When getting the parent of the file, get the directory
+ when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ when(mockWatchService.poll()).thenReturn(watchKey);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
new file mode 100644
index 0000000..1cd37fd
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
+
+ @BeforeClass
+ public static void setUp() throws InterruptedException, MalformedURLException {
+ Properties properties = new Properties();
+ restChangeNotifier = new RestChangeNotifier();
+ restChangeNotifier.initialize(properties);
+ restChangeNotifier.registerListener(mockChangeListener);
+ restChangeNotifier.start();
+
+ client = new OkHttpClient();
+
+ url = restChangeNotifier.getURI().toURL().toString();
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeNotifier.close();
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
new file mode 100644
index 0000000..6073a6f
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
+
+
+ @BeforeClass
+ public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+ Properties properties = new Properties();
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
+ restChangeNotifier = new RestChangeNotifier();
+ restChangeNotifier.initialize(properties);
+ restChangeNotifier.registerListener(mockChangeListener);
+ restChangeNotifier.start();
+
+ client = new OkHttpClient();
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
+
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
+
+ sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
+ client.setSslSocketFactory(sslContext.getSocketFactory());
+
+ url = restChangeNotifier.getURI().toURL().toString();
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeNotifier.close();
+ client = null;
+ }
+
+ private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+ char[] password = "localtest".toCharArray();
+
+ java.io.FileInputStream fis = null;
+ try {
+ fis = new java.io.FileInputStream(path);
+ ks.load(fis, password);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ return ks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
new file mode 100644
index 0000000..eae5872
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class MockChangeListener implements ConfigurationChangeListener {
+ String confFile;
+
+ @Override
+ public void handleChange(InputStream inputStream) {
+ try {
+ confFile = IOUtils.toString(inputStream, "UTF-8");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getDescriptor() {
+ return "MockChangeListener";
+ }
+
+ public String getConfFile() {
+ return confFile;
+ }
+
+ public void setConfFile(String confFile) {
+ this.confFile = confFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
new file mode 100644
index 0000000..78f6cd5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import com.squareup.okhttp.Headers;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class TestRestChangeNotifierCommon {
+
+ public static OkHttpClient client;
+ public static RestChangeNotifier restChangeNotifier;
+ public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
+ public static String url;
+ public static MockChangeListener mockChangeListener = new MockChangeListener();
+
+ @Test
+ public void testGet() throws Exception {
+ assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+ Request request = new Request.Builder()
+ .url(url)
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
+ }
+
+ @Test
+ public void testFileUpload() throws Exception {
+ assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+ File file = new File("src/test/resources/testUploadFile.txt");
+ assertTrue(file.exists());
+ assertTrue(file.canRead());
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
+ .addHeader("charset","UTF-8")
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+ assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
deleted file mode 100644
index 6843889..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class MockChangeListener implements ConfigurationChangeListener {
- String confFile;
-
- @Override
- public void handleChange(InputStream inputStream) {
- try {
- confFile = IOUtils.toString(inputStream, "UTF-8");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public String getConfFile() {
- return confFile;
- }
-
- public void setConfFile(String confFile) {
- this.confFile = confFile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
deleted file mode 100644
index b3c4f54..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import com.squareup.okhttp.Headers;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public abstract class TestRestChangeNotifierCommon {
-
- public static OkHttpClient client;
- public static RestChangeNotifier restChangeNotifier;
- public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
- public static String url;
- public static MockChangeListener mockChangeListener = new MockChangeListener();
-
- @Test
- public void testGet() throws Exception {
- assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
- Request request = new Request.Builder()
- .url(url)
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
- }
-
- @Test
- public void testFileUpload() throws Exception {
- assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
- File file = new File("src/test/resources/testUploadFile.txt");
- assertTrue(file.exists());
- assertTrue(file.canRead());
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
- .addHeader("charset","UTF-8")
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals(RestChangeNotifier.POST_TEXT, response.body().string());
-
- assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index 1a7f261..d0a7d71 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.junit.Assert;
import org.junit.Test;
@@ -63,6 +64,24 @@ public class TestConfigTransformer {
flowXml.deleteOnExit();
}
+ @Test
+ public void doesTransformOnDefaultFile() throws Exception {
+
+ ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
@Test(expected = IllegalArgumentException.class)
public void handleTransformInvalidFile() throws Exception {
@@ -70,4 +89,12 @@ public class TestConfigTransformer {
Assert.fail("Invalid configuration file was not detected.");
}
+
+ @Test(expected = ConfigurationChangeException.class)
+ public void handleTransformEmptyFile() throws Exception {
+
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
+
+ Assert.fail("Invalid configuration file was not detected.");
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/config-empty.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-empty.yml b/minifi-bootstrap/src/test/resources/config-empty.yml
new file mode 100644
index 0000000..fbbbeb9
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-empty.yml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ name: MiNiFi Flow
+ comment:
\ No newline at end of file