You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/25 17:54:47 UTC

[1/3] nifi-minifi git commit: MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

Repository: nifi-minifi
Updated Branches:
  refs/heads/master 0c04fbb61 -> 66dbda90c


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/default.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/default.yml b/minifi-bootstrap/src/test/resources/default.yml
new file mode 100644
index 0000000..064a746
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/default.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+    name: MiNiFi Flow
+    comment:
+
+Core Properties:
+    flow controller graceful shutdown period: 10 sec
+    flow service write delay interval: 500 ms
+    administrative yield duration: 30 sec
+    bored yield duration: 10 millis
+
+FlowFile Repository:
+    partitions: 256
+    checkpoint interval: 2 mins
+    always sync: false
+    Swap:
+        threshold: 20000
+        in period: 5 sec
+        in threads: 1
+        out period: 5 sec
+        out threads: 4
+
+Content Repository: 
+    content claim max appendable size: 10 MB
+    content claim max flow files: 100
+    always sync: false
+
+Component Status Repository:
+    buffer size: 1440
+    snapshot frequency: 1 min
+
+Security Properties:
+    keystore:
+    keystore type:
+    keystore password:
+    key password:
+    truststore:
+    truststore type:
+    truststore password:
+    ssl protocol:
+    Sensitive Props:
+        key:
+        algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+        provider: BC
+
+Processor Configuration:
+    name:
+    class:
+    max concurrent tasks:
+    scheduling strategy:
+    scheduling period:
+    penalization period:
+    yield period:
+    run duration nanos:
+    auto-terminated relationships list:
+    Properties:
+
+Connection Properties:
+    name:
+    max work queue size: 0
+    max work queue data size: 0 MB
+    flowfile expiration: 0 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Remote Processing Group:
+    name:
+    comment: 
+    url:
+    timeout:
+    yield period:
+    Input Port:
+        id:
+        name:
+        comments:
+        max concurrent tasks:
+        use compression:
+
+Provenance Reporting:
+    comment:
+    scheduling strategy:
+    scheduling period:
+    destination url:
+    port name:
+    originating url:
+    use compression:
+    timeout:
+    batch size:
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 107d9cc..a365c90 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -32,8 +32,8 @@ graceful.shutdown.seconds=20
 nifi.minifi.config=./conf/config.yml
 
 # Notifiers to use for the associated agent, comma separated list of class names
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.FileChangeNotifier
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier
+#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier
+#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier
 
 # File change notifier configuration
 


[3/3] nifi-minifi git commit: MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

Posted by jp...@apache.org.
MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

This closes #15


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/66dbda90
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/66dbda90
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/66dbda90

Branch: refs/heads/master
Commit: 66dbda90c904a679337633a3631ea09554887ec7
Parents: 0c04fbb
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Thu Apr 21 16:50:19 2016 -0400
Committer: Joseph Percivall <jo...@yahoo.com>
Committed: Mon Apr 25 11:53:32 2016 -0400

----------------------------------------------------------------------
 minifi-assembly/pom.xml                         |   2 +-
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 270 +++++++----
 .../nifi/minifi/bootstrap/ShutdownHook.java     |   2 +
 .../ConfigurationChangeException.java           |  42 ++
 .../ConfigurationChangeListener.java            |   6 +-
 .../ConfigurationChangeNotifier.java            |   4 +-
 .../configuration/FileChangeNotifier.java       | 183 --------
 .../configuration/ListenerHandleResult.java     |  55 +++
 .../configuration/RestChangeNotifier.java       | 259 -----------
 .../notifiers/FileChangeNotifier.java           | 202 ++++++++
 .../notifiers/RestChangeNotifier.java           | 289 ++++++++++++
 .../bootstrap/util/ConfigTransformer.java       | 459 +++++++++++--------
 .../configuration/TestFileChangeNotifier.java   | 206 ---------
 .../configuration/TestRestChangeNotifier.java   |  51 ---
 .../TestRestChangeNotifierSSL.java              |  96 ----
 .../notifiers/TestFileChangeNotifier.java       | 208 +++++++++
 .../notifiers/TestRestChangeNotifier.java       |  51 +++
 .../notifiers/TestRestChangeNotifierSSL.java    |  96 ++++
 .../notifiers/util/MockChangeListener.java      |  51 +++
 .../util/TestRestChangeNotifierCommon.java      |  89 ++++
 .../configuration/util/MockChangeListener.java  |  46 --
 .../util/TestRestChangeNotifierCommon.java      |  89 ----
 .../bootstrap/util/TestConfigTransformer.java   |  27 ++
 .../src/test/resources/config-empty.yml         |  18 +
 minifi-bootstrap/src/test/resources/default.yml | 101 ++++
 .../src/main/resources/conf/bootstrap.conf      |   4 +-
 26 files changed, 1699 insertions(+), 1207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index 7a0e6b5..7c5266d 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -261,7 +261,7 @@ limitations under the License.
         <!-- nifi.properties: web properties -->
         <nifi.web.war.directory>./lib</nifi.web.war.directory>
         <nifi.web.http.host />
-        <nifi.web.http.port>8080</nifi.web.http.port>
+        <nifi.web.http.port>8081</nifi.web.http.port>
         <nifi.web.https.host />
         <nifi.web.https.port />
         <nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 98d06f3..82b583f 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -51,10 +52,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
@@ -64,6 +67,8 @@ import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
 /**
  * <p>
  * The class which bootstraps Apache MiNiFi. This class looks for the
@@ -116,7 +121,6 @@ public class RunMiNiFi {
     private final Lock startedLock = new ReentrantLock();
     private final Lock lock = new ReentrantLock();
     private final Condition startupCondition = lock.newCondition();
-
     private final File bootstrapConfigFile;
 
     // used for logging initial info; these will be logged to console by default when the app is started
@@ -130,7 +134,10 @@ public class RunMiNiFi {
     private volatile int gracefulShutdownSeconds;
 
     private Set<ConfigurationChangeNotifier> changeNotifiers;
-    private ConfigurationChangeListener changeListener;
+    private MiNiFiConfigurationChangeListener changeListener;
+
+    // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
+    private AtomicBoolean reloading = new AtomicBoolean(false);
 
     private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
 
@@ -273,12 +280,21 @@ public class RunMiNiFi {
         final File confDir = bootstrapConfigFile.getParentFile();
         final File nifiHome = confDir.getParentFile();
         final File bin = new File(nifiHome, "bin");
-        final File lockFile = new File(bin, "minifi.reload.lock");
+        final File reloadFile = new File(bin, "minifi.reload.lock");
 
-        logger.debug("Reload File: {}", lockFile);
-        return lockFile;
+        logger.debug("Reload File: {}", reloadFile);
+        return reloadFile;
+    }
+
+    public File getSwapFile(final Logger logger) {
+        final File confDir = bootstrapConfigFile.getParentFile();
+        final File swapFile = new File(confDir, "swap.yml");
+
+        logger.debug("Swap File: {}", swapFile);
+        return swapFile;
     }
 
+
     private Properties loadProperties(final Logger logger) throws IOException {
         final Properties props = new Properties();
         final File statusFile = getStatusFile(logger);
@@ -663,7 +679,8 @@ public class RunMiNiFi {
                         }
                     }
 
-                    logger.info("MiNiFi has finished shutting down.");
+                    reloading.set(true);
+                    logger.info("MiNiFi has finished shutting down and will be reloaded.");
                 }
             } else {
                 logger.error("When sending RELOAD command to MiNiFi, got unexpected response {}", response);
@@ -1035,6 +1052,15 @@ public class RunMiNiFi {
     @SuppressWarnings({"rawtypes", "unchecked"})
     public void start() throws IOException, InterruptedException {
 
+        final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
+        final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
+        try {
+            performTransformation(new FileInputStream(configFile), confDir);
+        } catch (ConfigurationChangeException e) {
+            defaultLogger.error("The config file is malformed, unable to start.", e);
+            return;
+        }
+
         Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
         if (tuple == null) {
             cmdLogger.info("Start method returned null, ending start command.");
@@ -1048,76 +1074,115 @@ public class RunMiNiFi {
         ProcessBuilder builder = tuple.getKey();
         Process process = tuple.getValue();
 
-        while (true) {
-            final boolean alive = isAlive(process);
+        try {
+            while (true) {
+                final boolean alive = isAlive(process);
 
-            if (alive) {
-                try {
-                    Thread.sleep(1000L);
-                } catch (final InterruptedException ie) {
-                }
-            } else {
-                final Runtime runtime = Runtime.getRuntime();
-                try {
-                    runtime.removeShutdownHook(shutdownHook);
-                } catch (final IllegalStateException ise) {
-                    // happens when already shutting down
-                }
+                if (alive) {
+                    try {
+                        Thread.sleep(1000L);
 
-                String now = sdf.format(System.currentTimeMillis());
-                if (autoRestartNiFi) {
-                    final File statusFile = getStatusFile(defaultLogger);
-                    if (!statusFile.exists()) {
-                        defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
-                        return;
-                    }
+                        if (reloading.get() && getNifiStarted()) {
+                            final File swapConfigFile = getSwapFile(defaultLogger);
+                            if (swapConfigFile.exists()) {
+                                defaultLogger.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration.");
 
-                    final File lockFile = getLockFile(defaultLogger);
-                    if (lockFile.exists()) {
-                        defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
-                        return;
-                    }
+                                if (swapConfigFile.delete()) {
+                                    defaultLogger.info("Swap file was successfully deleted.");
+                                } else {
+                                    defaultLogger.info("Swap file was not deleted.");
+                                }
+                            }
 
-                    final File reloadFile = getReloadFile(defaultLogger);
-                    if (reloadFile.exists()) {
-                        defaultLogger.info("Currently reloading configuration.  Will not restart MiNiFi.");
-                        Thread.sleep(5000L);
-                        continue;
-                    }
+                            reloading.set(false);
+                        }
 
-                    final boolean previouslyStarted = getNifiStarted();
-                    if (!previouslyStarted) {
-                        defaultLogger.info("MiNiFi never started. Will not restart MiNiFi");
-                        return;
-                    } else {
-                        setNiFiStarted(false);
+                    } catch (final InterruptedException ie) {
+                    }
+                } else {
+                    final Runtime runtime = Runtime.getRuntime();
+                    try {
+                        runtime.removeShutdownHook(shutdownHook);
+                    } catch (final IllegalStateException ise) {
+                        // happens when already shutting down
                     }
 
-                    process = builder.start();
-                    handleLogging(process);
+                    if (autoRestartNiFi) {
+                        final File statusFile = getStatusFile(defaultLogger);
+                        if (!statusFile.exists()) {
+                            defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
+                            return;
+                        }
 
-                    Long pid = getPid(process, defaultLogger);
-                    if (pid != null) {
-                        nifiPid = pid;
-                        final Properties nifiProps = new Properties();
-                        nifiProps.setProperty("pid", String.valueOf(nifiPid));
-                        saveProperties(nifiProps, defaultLogger);
-                    }
+                        final File lockFile = getLockFile(defaultLogger);
+                        if (lockFile.exists()) {
+                            defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
+                            return;
+                        }
 
-                    shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
-                    runtime.addShutdownHook(shutdownHook);
+                        final File reloadFile = getReloadFile(defaultLogger);
+                        if (reloadFile.exists()) {
+                            defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi.");
+                            Thread.sleep(5000L);
+                            continue;
+                        }
+
+                        final boolean previouslyStarted = getNifiStarted();
+                        if (!previouslyStarted) {
+                            final File swapConfigFile = getSwapFile(defaultLogger);
+                            if (swapConfigFile.exists()) {
+                                defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+                                try {
+                                    performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                } catch (ConfigurationChangeException e) {
+                                    defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+                                    return;
+                                }
 
-                    final boolean started = waitForStart();
+                                Files.copy(swapConfigFile.toPath(), Paths.get(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+                                defaultLogger.info("Replacing config file with swap file and deleting swap file");
+                                if (!swapConfigFile.delete()) {
+                                    defaultLogger.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+                                }
+                                reloading.set(false);
+                            } else {
+                                defaultLogger.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi");
+                                return;
+                            }
+                        } else {
+                            setNiFiStarted(false);
+                        }
 
-                    if (started) {
-                        defaultLogger.info("Successfully started Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+                        process = builder.start();
+                        handleLogging(process);
+
+                        Long pid = getPid(process, defaultLogger);
+                        if (pid != null) {
+                            nifiPid = pid;
+                            final Properties nifiProps = new Properties();
+                            nifiProps.setProperty("pid", String.valueOf(nifiPid));
+                            saveProperties(nifiProps, defaultLogger);
+                        }
+
+                        shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
+                        runtime.addShutdownHook(shutdownHook);
+
+                        final boolean started = waitForStart();
+
+                        if (started) {
+                            defaultLogger.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+                        } else {
+                            defaultLogger.error("Apache MiNiFi does not appear to have started");
+                        }
                     } else {
-                        defaultLogger.error("Apache MiNiFi does not appear to have started");
+                        return;
                     }
-                } else {
-                    return;
                 }
             }
+        } finally {
+            shutdownChangeNotifiers();
         }
     }
 
@@ -1255,7 +1320,7 @@ public class RunMiNiFi {
             defaultLogger.warn("Apache MiNiFi has started but failed to persist MiNiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe});
         }
 
-        defaultLogger.info("Apache MiNiFi now running and listening for Bootstrap requests on port {}", port);
+        defaultLogger.info("The thread to run Apache MiNiFi is now running and listening for Bootstrap requests on port {}", port);
     }
 
     int getNiFiCommandControlPort() {
@@ -1328,7 +1393,7 @@ public class RunMiNiFi {
         }
 
         @Override
-        public void handleChange(InputStream configInputStream) {
+        public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
             logger.info("Received notification of a change");
             try {
                 final Properties bootstrapProperties = runner.getBootstrapProperties();
@@ -1346,26 +1411,51 @@ public class RunMiNiFi {
                 final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
                 newConfigBais.mark(-1);
 
-                logger.info("Persisting changes to {}", configFile.getAbsolutePath());
-                saveFile(newConfigBais, configFile);
-
-                // Reset the input stream to provide to the transformer
-                newConfigBais.reset();
-
-                final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                logger.info("Performing transformation for input and saving outputs to {}", configFile);
-                performTransformation(newConfigBais, confDir);
-
-                logger.info("Reloading instance with new configuration");
-                restartInstance();
+                final File swapConfigFile = runner.getSwapFile(logger);
+                logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
+                Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
 
+                try {
+                    logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+                    saveFile(newConfigBais, configFile);
+
+                     try {
+                         // Reset the input stream to provide to the transformer
+                         newConfigBais.reset();
+
+                         final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+                         logger.info("Performing transformation for input and saving outputs to {}", confDir);
+                         performTransformation(newConfigBais, confDir);
+
+                         logger.info("Reloading instance with new configuration");
+                         restartInstance();
+                     } catch (Exception e){
+                         logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+                         Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+                         throw e;
+                     }
+                } catch (Exception e){
+                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+                    if(!swapConfigFile.delete()){
+                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                    }
+                    throw e;
+                }
+            } catch (ConfigurationChangeException e){
+                logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
+                throw e;
             } catch (IOException ioe) {
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
-                throw new IllegalStateException("Unable to perform reload of received configuration change", ioe);
+                throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
             }
         }
 
-        private void saveFile(final InputStream configInputStream, File configFile) {
+        @Override
+        public String getDescriptor() {
+            return "MiNiFiConfigurationChangeListener";
+        }
+
+        private void saveFile(final InputStream configInputStream, File configFile) throws IOException {
             try {
                 try (final FileOutputStream configFileOutputStream = new FileOutputStream(configFile)) {
                     byte[] copyArray = new byte[1024];
@@ -1375,29 +1465,31 @@ public class RunMiNiFi {
                     }
                 }
             } catch (IOException ioe) {
-                throw new IllegalStateException("Unable to save updated configuration to the configured config file location", ioe);
+                throw new IOException("Unable to save updated configuration to the configured config file location", ioe);
             }
         }
 
-        private void performTransformation(InputStream configIs, String configDestinationPath) {
-            try {
-                ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
-            } catch (Exception e) {
-                logger.error("Unable to successfully transform the provided configuration", e);
-                throw new IllegalStateException("Unable to successfully transform the provided configuration", e);
-            }
-        }
 
-        private void restartInstance() {
-            logger.info("Restarting MiNiFi with new configuration");
+
+        private void restartInstance() throws IOException {
             try {
                 runner.reload();
             } catch (IOException e) {
-                throw new IllegalStateException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+                throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
             }
         }
     }
 
+    private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+        try {
+            ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+        } catch (ConfigurationChangeException e){
+            throw e;
+        } catch (Exception e) {
+            throw new IOException("Unable to successfully transform the provided configuration", e);
+        }
+    }
+
     private static class Status {
 
         private final Integer port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index 13f0d16..ad3a2df 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -101,5 +101,7 @@ public class ShutdownHook extends Thread {
         if (!statusFile.delete()) {
             System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
         }
+
+        System.out.println("MiNiFi is done shutting down");
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
new file mode 100644
index 0000000..04bbb02
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+/**
+ * Exception to indicate there was a problem handling a change to the configuration
+ */
+
+public class ConfigurationChangeException extends Exception {
+
+    public ConfigurationChangeException() {
+        super();
+    }
+
+    public ConfigurationChangeException(String message) {
+        super(message);
+    }
+
+    public ConfigurationChangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ConfigurationChangeException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 7d9183a..756b051 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -29,6 +29,10 @@ public interface ConfigurationChangeListener {
      *
      * @param is stream of the detected content received from the change notifier
      */
-    void handleChange(InputStream is);
+    void handleChange(InputStream is) throws ConfigurationChangeException;
 
+    /**
+     * Returns a succinct string identifying this particular listener
+     */
+    String getDescriptor();
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 7ad32f1..745ce6c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.minifi.bootstrap.configuration;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.Set;
 
@@ -52,6 +53,5 @@ public interface ConfigurationChangeNotifier extends Closeable {
     /**
      * Provide the mechanism by which listeners are notified
      */
-    void notifyListeners();
-
+    Collection<ListenerHandleResult> notifyListeners();
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
deleted file mode 100644
index d3f51f7..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
- * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
- * ensures continuity of monitoring.
- */
-public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
-
-    private Path configFile;
-    private WatchService watchService;
-    private long pollingSeconds;
-
-    private ScheduledExecutorService executorService;
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-
-    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
-
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
-    }
-
-    @Override
-    public void notifyListeners() {
-        final File fileToRead = configFile.toFile();
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
-                listener.handleChange(fis);
-            } catch (IOException ex) {
-                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
-            }
-        }
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return this.configurationChangeListeners.add(listener);
-    }
-
-    protected boolean targetChanged() {
-        boolean targetChanged = false;
-
-        final WatchKey watchKey = this.watchService.poll();
-
-        if (watchKey == null) {
-            return targetChanged;
-        }
-
-        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
-            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
-
-            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
-            final Path changedFile = pathEvent.context();
-
-            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
-            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
-        }
-
-        // After completing inspection, reset for detection of subsequent change events
-        boolean valid = watchKey.reset();
-        if (!valid) {
-            throw new IllegalStateException("Unable to reinitialize file system watcher.");
-        }
-
-        return targetChanged;
-    }
-
-    protected static WatchService initializeWatcher(Path filePath) {
-        try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
-
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
-        }
-    }
-
-    @Override
-    public void run() {
-        if (targetChanged()) {
-            notifyListeners();
-        }
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
-        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
-
-        if (rawPath == null || rawPath.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
-        }
-
-        try {
-            setConfigFile(Paths.get(rawPath));
-            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
-            setWatchService(initializeWatcher(configFile));
-        } catch (Exception e) {
-            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
-        }
-    }
-
-    protected void setConfigFile(Path configFile) {
-        this.configFile = configFile;
-    }
-
-    protected void setWatchService(WatchService watchService) {
-        this.watchService = watchService;
-    }
-
-    protected void setPollingPeriod(long duration, TimeUnit unit) {
-        if (duration < 0) {
-            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
-        }
-        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
-    }
-
-    @Override
-    public void start() {
-        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread t = Executors.defaultThreadFactory().newThread(r);
-                t.setName("File Change Notifier Thread");
-                t.setDaemon(true);
-                return t;
-            }
-        });
-        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
-    }
-
-    @Override
-    public void close() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
new file mode 100644
index 0000000..8ac4cea
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+public class ListenerHandleResult {
+
+    private final ConfigurationChangeListener configurationChangeListener;
+    private final Exception failureCause;
+
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+        this.configurationChangeListener = configurationChangeListener;
+        failureCause = null;
+    }
+
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+        this.configurationChangeListener = configurationChangeListener;
+        this.failureCause = failureCause;
+    }
+
+    public boolean succeeded(){
+        return failureCause == null;
+    }
+
+    public String getDescriptor(){
+        return configurationChangeListener.getDescriptor();
+    }
+
+    public Exception getFailureCause(){
+        return failureCause;
+    }
+
+    @Override
+    public String toString() {
+        if(failureCause == null){
+            return getDescriptor() + " successfully handled the configuration change";
+        } else {
+            return getDescriptor() + " FAILED to handle the configuration change due to: '"  + failureCause.getMessage() + "'";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
deleted file mode 100644
index 5807f89..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-
-public class RestChangeNotifier implements ConfigurationChangeNotifier {
-
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-    private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
-    private String configFile = null;
-    private final Server jetty;
-    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
-            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
-            "Send a POST http request to '/' to upload the file.";
-    public static final String POST_TEXT ="Configuration received, notifying listeners.\n";
-    public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
-
-
-    public static final String POST = "POST";
-    public static final String GET = "GET";
-
-    public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
-    public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
-    public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
-    public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
-    public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
-    public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
-    public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
-    public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
-    public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
-
-    public RestChangeNotifier(){
-        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
-        queuedThreadPool.setDaemon(true);
-        jetty = new Server(queuedThreadPool);
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        logger.info("Initializing");
-
-        // create the secure connector if keystore location is specified
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            createSecureConnector(properties);
-        } else {
-            // create the unsecure connector otherwise
-            createConnector(properties);
-        }
-
-        HandlerCollection handlerCollection = new HandlerCollection(true);
-        handlerCollection.addHandler(new JettyHandler());
-        jetty.setHandler(handlerCollection);
-    }
-
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return configurationChangeListeners;
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return configurationChangeListeners.add(listener);
-    }
-
-    @Override
-    public void notifyListeners() {
-        if (configFile == null){
-            throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
-        }
-
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes());) {
-                listener.handleChange(fis);
-            } catch (IOException ex) {
-                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
-            }
-        }
-
-        configFile = null;
-    }
-
-    @Override
-    public void start(){
-        try {
-            jetty.start();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-
-    @Override
-    public void close() throws IOException {
-        logger.warn("Shutting down the jetty server");
-        try {
-            jetty.stop();
-            jetty.destroy();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        logger.warn("Done shutting down the jetty server");
-    }
-
-    public URI getURI(){
-        return jetty.getURI();
-    }
-
-    public int getPort(){
-        if (!jetty.isStarted()) {
-            throw new IllegalStateException("Jetty server not started");
-        }
-        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
-    }
-
-    public String getConfigString(){
-        return configFile;
-    }
-
-    private void setConfigFile(String configFile){
-        this.configFile = configFile;
-    }
-
-    private void createConnector(Properties properties) {
-        final ServerConnector http = new ServerConnector(jetty);
-
-        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
-        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed or distant environments may have significant delays when executing.
-        http.setIdleTimeout(30000L);
-        jetty.addConnector(http);
-
-        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
-    }
-
-    private void createSecureConnector(Properties properties) {
-        SslContextFactory ssl = new SslContextFactory();
-
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
-            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
-            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
-        }
-
-        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
-            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
-            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
-            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
-            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
-        }
-
-        // build the connector
-        final ServerConnector https = new ServerConnector(jetty, ssl);
-
-        // set host and port
-        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
-        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed environments may have significant delays when executing.
-        https.setIdleTimeout(30000L);
-
-        // add the connector
-        jetty.addConnector(https);
-
-        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
-    }
-
-
-    public class JettyHandler extends AbstractHandler {
-
-        @Override
-        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
-                throws IOException, ServletException {
-
-            logRequest(request);
-
-            baseRequest.setHandled(true);
-
-            if(POST.equals(request.getMethod())) {
-                final StringBuilder configBuilder = new StringBuilder();
-                BufferedReader reader = request.getReader();
-                if(reader != null && reader.ready()){
-                    String line;
-                    while ((line = reader.readLine()) != null) {
-                        configBuilder.append(line);
-                        configBuilder.append(System.getProperty("line.separator"));
-                    }
-                }
-                setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
-                notifyListeners();
-                writeOutput(response, POST_TEXT, 200);
-            } else if(GET.equals(request.getMethod())) {
-                writeOutput(response, GET_TEXT, 200);
-            } else {
-                writeOutput(response, OTHER_TEXT, 404);
-            }
-        }
-
-        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
-            response.setStatus(responseCode);
-            response.setContentType("text/plain");
-            response.setContentLength(responseText.length());
-            try (PrintWriter writer = response.getWriter()) {
-                writer.print(responseText);
-                writer.flush();
-            }
-        }
-
-        private void logRequest(HttpServletRequest request){
-            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
-            logger.info("request method = " + request.getMethod());
-            logger.info("request url = " + request.getRequestURL());
-            logger.info("context path = " + request.getContextPath());
-            logger.info("request content type = " + request.getContentType());
-            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
new file mode 100644
index 0000000..faba2f0
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
+
+    private Path configFile;
+    private WatchService watchService;
+    private long pollingSeconds;
+
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeNotifier.class);
+    private ScheduledExecutorService executorService;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+
+    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
+    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
+
+    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+    @Override
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return Collections.unmodifiableSet(configurationChangeListeners);
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners() {
+        logger.info("Notifying Listeners of a change");
+        final File fileToRead = configFile.toFile();
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
+                listener.handleChange(fis);
+                result = new ListenerHandleResult(listener);
+            } catch (IOException | ConfigurationChangeException ex) {
+                result =  new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+        return listenerHandleResults;
+    }
+
+    @Override
+    public boolean registerListener(ConfigurationChangeListener listener) {
+        return this.configurationChangeListeners.add(listener);
+    }
+
+    protected boolean targetChanged() {
+        boolean targetChanged = false;
+
+        final WatchKey watchKey = this.watchService.poll();
+
+        if (watchKey == null) {
+            return targetChanged;
+        }
+
+        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+            final Path changedFile = pathEvent.context();
+
+            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
+        }
+
+        // After completing inspection, reset for detection of subsequent change events
+        boolean valid = watchKey.reset();
+        if (!valid) {
+            throw new IllegalStateException("Unable to reinitialize file system watcher.");
+        }
+
+        return targetChanged;
+    }
+
+    protected static WatchService initializeWatcher(Path filePath) {
+        try {
+            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+            final Path watchDirectory = filePath.getParent();
+            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+            return fsWatcher;
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        }
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change");
+        if (targetChanged()) {
+            notifyListeners();
+        }
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+        if (rawPath == null || rawPath.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+        }
+
+        try {
+            setConfigFile(Paths.get(rawPath));
+            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+            setWatchService(initializeWatcher(configFile));
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+        }
+    }
+
+    protected void setConfigFile(Path configFile) {
+        this.configFile = configFile;
+    }
+
+    protected void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    protected void setPollingPeriod(long duration, TimeUnit unit) {
+        if (duration < 0) {
+            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+        }
+        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("File Change Notifier Thread");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void close() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
new file mode 100644
index 0000000..777214f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+
+public class RestChangeNotifier implements ConfigurationChangeNotifier {
+
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+    private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
+    private String configFile = null;
+    private final Server jetty;
+    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
+            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
+            "Send a POST http request to '/' to upload the file.";
+    public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
+
+
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
+    public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
+    public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
+    public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
+
+    public RestChangeNotifier(){
+        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+        queuedThreadPool.setDaemon(true);
+        jetty = new Server(queuedThreadPool);
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+        logger.info("Initializing");
+
+        // create the secure connector if keystore location is specified
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            createSecureConnector(properties);
+        } else {
+            // create the unsecure connector otherwise
+            createConnector(properties);
+        }
+
+        HandlerCollection handlerCollection = new HandlerCollection(true);
+        handlerCollection.addHandler(new JettyHandler());
+        jetty.setHandler(handlerCollection);
+    }
+
+    @Override
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return configurationChangeListeners;
+    }
+
+    @Override
+    public boolean registerListener(ConfigurationChangeListener listener) {
+        return configurationChangeListeners.add(listener);
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners() {
+        if (configFile == null){
+            throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
+        }
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes())) {
+                listener.handleChange(fis);
+                result = new ListenerHandleResult(listener);
+            } catch (IOException | ConfigurationChangeException ex) {
+                result = new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+
+        configFile = null;
+        return listenerHandleResults;
+    }
+
+    @Override
+    public void start(){
+        try {
+            jetty.start();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        logger.warn("Shutting down the jetty server");
+        try {
+            jetty.stop();
+            jetty.destroy();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        logger.warn("Done shutting down the jetty server");
+    }
+
+    public URI getURI(){
+        return jetty.getURI();
+    }
+
+    public int getPort(){
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+    }
+
+    public String getConfigString(){
+        return configFile;
+    }
+
+    private void setConfigFile(String configFile){
+        this.configFile = configFile;
+    }
+
+    private void createConnector(Properties properties) {
+        final ServerConnector http = new ServerConnector(jetty);
+
+        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
+        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed or distant environments may have significant delays when executing.
+        http.setIdleTimeout(30000L);
+        jetty.addConnector(http);
+
+        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
+    }
+
+    private void createSecureConnector(Properties properties) {
+        SslContextFactory ssl = new SslContextFactory();
+
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
+            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
+            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
+        }
+
+        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
+            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
+            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
+            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
+            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
+        }
+
+        // build the connector
+        final ServerConnector https = new ServerConnector(jetty, ssl);
+
+        // set host and port
+        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
+        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed environments may have significant delays when executing.
+        https.setIdleTimeout(30000L);
+
+        // add the connector
+        jetty.addConnector(https);
+
+        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
+    }
+
+
+    public class JettyHandler extends AbstractHandler {
+
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+                throws IOException, ServletException {
+
+            logRequest(request);
+
+            baseRequest.setHandled(true);
+
+            if(POST.equals(request.getMethod())) {
+                final StringBuilder configBuilder = new StringBuilder();
+                BufferedReader reader = request.getReader();
+                if(reader != null && reader.ready()){
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        configBuilder.append(line);
+                        configBuilder.append(System.getProperty("line.separator"));
+                    }
+                }
+                setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
+                Collection<ListenerHandleResult> listenerHandleResults = notifyListeners();
+
+                int statusCode = 200;
+                for (ListenerHandleResult result: listenerHandleResults){
+                    if(!result.succeeded()){
+                        statusCode = 500;
+                        break;
+                    }
+                }
+
+                writeOutput(response, getPostText(listenerHandleResults), statusCode);
+            } else if(GET.equals(request.getMethod())) {
+                writeOutput(response, GET_TEXT, 200);
+            } else {
+                writeOutput(response, OTHER_TEXT, 404);
+            }
+        }
+
+        private String getPostText(Collection<ListenerHandleResult> listenerHandleResults){
+            StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n");
+
+            for (ListenerHandleResult result : listenerHandleResults) {
+                postResult.append(result.toString());
+                postResult.append("\n");
+            }
+
+            return postResult.toString();
+        }
+
+        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
+            response.setStatus(responseCode);
+            response.setContentType("text/plain");
+            response.setContentLength(responseText.length());
+            try (PrintWriter writer = response.getWriter()) {
+                writer.print(responseText);
+                writer.flush();
+            }
+        }
+
+        private void logRequest(HttpServletRequest request){
+            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            logger.info("request method = " + request.getMethod());
+            logger.info("request url = " + request.getRequestURL());
+            logger.info("context path = " + request.getContextPath());
+            logger.info("request content type = " + request.getContentType());
+            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
+        }
+
+    }
+}


[2/3] nifi-minifi git commit: MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

Posted by jp...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 8bb25c8..633cce2 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -19,6 +19,8 @@ package org.apache.nifi.minifi.bootstrap.util;
 
 
 import org.apache.nifi.controller.FlowSerializationException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -37,6 +39,8 @@ import javax.xml.transform.stream.StreamResult;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
@@ -151,11 +155,19 @@ public final class ConfigTransformer {
             // Verify the parsed object is a Map structure
             if (loadedObject instanceof Map) {
                 final Map<String, Object> result = (Map<String, Object>) loadedObject;
+
+                // Create nifi.properties and flow.xml.gz in memory
+                ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
+                writeNiFiProperties(result, nifiPropertiesOutputStream);
+
+                DOMSource flowXml = createFlowXml(result);
+
                 // Write nifi.properties and flow.xml.gz
-                writeNiFiProperties(result, destPath);
-                writeFlowXml(result, destPath);
+                writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
+
+                writeFlowXmlFile(flowXml, destPath);
             } else {
-                throw new IllegalArgumentException("Provided YAML configuration is malformed.");
+                throw new IllegalArgumentException("Provided YAML configuration is not a Map.");
             }
         } finally {
             if (sourceStream != null) {
@@ -164,20 +176,49 @@ public final class ConfigTransformer {
         }
     }
 
-    private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException {
+    private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
+        try {
+            final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
+            FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString()));
+            nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer());
+        } finally {
+            if (nifiPropertiesOutputStream != null){
+                nifiPropertiesOutputStream.flush();
+                nifiPropertiesOutputStream.close();
+            }
+        }
+    }
+
+    private static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException {
+
+        final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
+        final OutputStream outStream = new GZIPOutputStream(fileOut);
+        final StreamResult streamResult = new StreamResult(outStream);
+
+        // configure the transformer and convert the DOM
+        final TransformerFactory transformFactory = TransformerFactory.newInstance();
+        final Transformer transformer = transformFactory.newTransformer();
+        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+        // transform the document to byte stream
+        transformer.transform(domSource, streamResult);
+        outStream.flush();
+        outStream.close();
+    }
+
+    private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException {
         PrintWriter writer = null;
         try {
-            final Path nifiPropertiesPath = Paths.get(path, "nifi.properties");
-            writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8");
+            writer = new PrintWriter(outputStream, true);
 
-            Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
-            Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
+            Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
+            Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
             Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY);
-            Map<String,Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
-            Map<String,Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
-            Map<String,Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-            Map<String,Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
-
+            Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
+            Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
+            Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+            Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
 
             writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE);
             writer.println("# Core Properties #");
@@ -284,6 +325,8 @@ public final class ConfigTransformer {
             writer.println();
             writer.println("# cluster manager properties (only configure for cluster manager) #");
             writer.println("nifi.cluster.is.manager=false");
+        } catch (NullPointerException e) {
+            throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e);
         } finally {
             if (writer != null){
                 writer.flush();
@@ -291,7 +334,7 @@ public final class ConfigTransformer {
             }
         }
     }
-    private static void writeFlowXml(Map<String, Object> topLevelYaml, String path) throws Exception {
+    private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException {
         try {
             // create a new, empty document
             final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -304,39 +347,32 @@ public final class ConfigTransformer {
             final Element rootNode = doc.createElement("flowController");
             doc.appendChild(rootNode);
             Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-            addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
-            addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
+            addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
             addProcessGroup(rootNode, topLevelYaml, "rootGroup");
 
             Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-            String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
-            if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
-                final Element controllerServicesNode = doc.createElement("controllerServices");
-                rootNode.appendChild(controllerServicesNode);
-                addSSLControllerService(controllerServicesNode, securityProps);
+            if (securityProps != null) {
+                String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+                if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+                    final Element controllerServicesNode = doc.createElement("controllerServices");
+                    rootNode.appendChild(controllerServicesNode);
+                    addSSLControllerService(controllerServicesNode, securityProps);
+                }
+            }
+
+            Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+            if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) {
+                final Element reportingTasksNode = doc.createElement("reportingTasks");
+                rootNode.appendChild(reportingTasksNode);
+                addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
             }
 
-            final Element reportingTasksNode = doc.createElement("reportingTasks");
-            rootNode.appendChild(reportingTasksNode);
-            addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
-
-            final DOMSource domSource = new DOMSource(doc);
-            final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
-            final OutputStream outStream = new GZIPOutputStream(fileOut);
-            final StreamResult streamResult = new StreamResult(outStream);
-
-            // configure the transformer and convert the DOM
-            final TransformerFactory transformFactory = TransformerFactory.newInstance();
-            final Transformer transformer = transformFactory.newTransformer();
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-            // transform the document to byte stream
-            transformer.transform(domSource, streamResult);
-            outStream.flush();
-            outStream.close();
-        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
+            return new DOMSource(doc);
+        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
             throw new FlowSerializationException(e);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e);
         }
     }
 
@@ -345,109 +381,140 @@ public final class ConfigTransformer {
         return value == null ? "" : value.toString();
     }
 
-    private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) {
-        final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
-        addTextElement(serviceElement, "id", "SSL-Context-Service");
-        addTextElement(serviceElement, "name", "SSL-Context-Service");
-        addTextElement(serviceElement, "comment", "");
-        addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
-
-        addTextElement(serviceElement, "enabled", "true");
-
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
-        attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
-        attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
-        attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
-        attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
-        attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
-        attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
-
-        addConfiguration(serviceElement, attributes);
+    private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){
+        Object value = null;
+        if (map != null){
+            value = map.get(key);
+        }
+        return value == null ? theDefault : value.toString();
+    }
 
-        element.appendChild(serviceElement);
+    private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException {
+        try {
+            final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+            addTextElement(serviceElement, "id", "SSL-Context-Service");
+            addTextElement(serviceElement, "name", "SSL-Context-Service");
+            addTextElement(serviceElement, "comment", "");
+            addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
+
+            addTextElement(serviceElement, "enabled", "true");
+
+            Map<String, Object> attributes = new HashMap<>();
+            attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
+            attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
+            attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
+            attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
+            attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
+            attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
+            attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
+
+            addConfiguration(serviceElement, attributes);
+
+            element.appendChild(serviceElement);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e);
+        }
     }
 
-    private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) {
-        Map<String,Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
+    private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
 
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Root-Group");
-        addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY) );
-        addPosition(element);
-        addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement(elementName);
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Root-Group");
+            addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
 
-        Map<String,Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-        addProcessor(element, processorConfig);
+            Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
+            addProcessor(element, processorConfig);
 
-        Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-        addRemoteProcessGroup(element, remoteProcessingGroup);
+            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+            addRemoteProcessGroup(element, remoteProcessingGroup);
 
-        addConnection(element, topLevelYaml);
+            addConnection(element, topLevelYaml);
+        } catch (ConfigurationChangeException e){
+            throw e;
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e);
+        }
     }
 
-    private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) {
+    private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException {
 
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("processor");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Processor");
-        addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
-
-        addPosition(element);
-        addStyle(element);
-
-        addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
-        addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
-        addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
-        addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
-        addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
-        addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
-        addTextElement(element, "bulletinLevel", "WARN");
-        addTextElement(element, "lossTolerant", "false");
-        addTextElement(element, "scheduledState", "RUNNING");
-        addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
-        addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
-
-        addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
-
-        Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
-        if (autoTerminatedRelationships != null) {
-            for (String rel : autoTerminatedRelationships) {
-                addTextElement(element, "autoTerminatedRelationship", rel);
+        try {
+            if (processorConfig.get(CLASS_KEY) == null) {
+                // Only add a processor if it has a class
+                return;
+            }
+
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("processor");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Processor");
+            addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
+
+            addPosition(element);
+            addStyle(element);
+
+            addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
+            addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
+            addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
+            addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
+            addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
+            addTextElement(element, "bulletinLevel", "WARN");
+            addTextElement(element, "lossTolerant", "false");
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
+            addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
+
+            addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
+
+            Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
+            if (autoTerminatedRelationships != null) {
+                for (String rel : autoTerminatedRelationships) {
+                    addTextElement(element, "autoTerminatedRelationship", rel);
+                }
             }
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e);
         }
     }
 
-    private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) {
-        Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
-        final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
-        addTextElement(taskElement, "id", "Provenance-Reporting");
-        addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
-        addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
-        addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
-        addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
-        addTextElement(taskElement, "scheduledState", "RUNNING");
-        addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
-
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
-        attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
-        attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
-        attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
-        attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
-
-        Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-        String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
-        if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
-            attributes.put("SSL Context Service", "SSL-Context-Service");
-        }
+    private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+            final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+            addTextElement(taskElement, "id", "Provenance-Reporting");
+            addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
+            addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
+            addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
+            addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
+            addTextElement(taskElement, "scheduledState", "RUNNING");
+            addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
+
+            Map<String, Object> attributes = new HashMap<>();
+            attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
+            attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
+            attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
+            attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
+            attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
+
+            Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+            String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+            if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+                attributes.put("SSL Context Service", "SSL-Context-Service");
+            }
 
-        addConfiguration(taskElement, attributes);
+            addConfiguration(taskElement, attributes);
 
-        element.appendChild(taskElement);
+            element.appendChild(taskElement);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
+        }
     }
 
     private static void addConfiguration(final Element element, Map<String, Object> elementConfig) {
@@ -472,75 +539,103 @@ public final class ConfigTransformer {
         parentElement.appendChild(element);
     }
 
-    private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) {
-
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("remoteProcessGroup");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Remote-Process-Group");
-        addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
-        addPosition(element);
-        addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
-        addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
-        addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
-        addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
-        addTextElement(element, "transmitting", "true");
-
-        Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
-        addRemoteGroupPort(element, inputPort, "inputPort");
+    private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException {
+        try {
+            if (remoteProcessingGroup.get(URL_KEY) == null) {
+                // Only add an an RPG if it has a URL
+                return;
+            }
 
-        parentElement.appendChild(element);
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("remoteProcessGroup");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Remote-Process-Group");
+            addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
+            addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
+            addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
+            addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
+            addTextElement(element, "transmitting", "true");
+
+            Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+            addRemoteGroupPort(element, inputPort, "inputPort");
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e);
+        }
     }
 
-    private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", getValueString(inputPort, ID_KEY));
-        addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
-        addPosition(element);
-        addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
-        addTextElement(element, "scheduledState", "RUNNING");
-        addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
-        addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+    private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException {
 
-        parentElement.appendChild(element);
+        try {
+            if (inputPort.get(ID_KEY) == null) {
+                // Only add an input port if it has an ID
+                return;
+            }
+
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement(elementName);
+            parentElement.appendChild(element);
+            addTextElement(element, "id", getValueString(inputPort, ID_KEY));
+            addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e);
+        }
     }
 
-    private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) {
-        Map<String,Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
-        Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-        Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("connection");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Connection");
-        addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
+    private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
+            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+            Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+            Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
 
-        final Element bendPointsElement = doc.createElement("bendPoints");
-        element.appendChild(bendPointsElement);
+            if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) {
+                // Only add the connection if the input port and processor config are created
+                return;
+            }
 
-        addTextElement(element, "labelIndex", "1");
-        addTextElement(element, "zIndex", "0");
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("connection");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Connection");
+            addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
 
-        addTextElement(element, "sourceId", "Processor");
-        addTextElement(element, "sourceGroupId", "Root-Group");
-        addTextElement(element, "sourceType", "PROCESSOR");
+            final Element bendPointsElement = doc.createElement("bendPoints");
+            element.appendChild(bendPointsElement);
 
-        addTextElement(element, "destinationId", getValueString(inputPort,ID_KEY));
-        addTextElement(element, "destinationGroupId", "Remote-Process-Group");
-        addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+            addTextElement(element, "labelIndex", "1");
+            addTextElement(element, "zIndex", "0");
 
-        addTextElement(element, "relationship", "success");
+            addTextElement(element, "sourceId", "Processor");
+            addTextElement(element, "sourceGroupId", "Root-Group");
+            addTextElement(element, "sourceType", "PROCESSOR");
 
-        addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
-        addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
+            addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY));
+            addTextElement(element, "destinationGroupId", "Remote-Process-Group");
+            addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
 
-        addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
-        addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+            addTextElement(element, "relationship", "success");
 
+            addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
+            addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
 
-        parentElement.appendChild(element);
+            addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
+            addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e);
+        }
     }
 
     private static void addPosition(final Element parentElement) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
deleted file mode 100644
index 9432a2f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeNotifier {
-
-    private static final String CONFIG_FILENAME = "config.yml";
-    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
-    private FileChangeNotifier notifierSpy;
-    private WatchService mockWatchService;
-    private Properties testProperties;
-
-    @Before
-    public void setUp() throws Exception {
-        mockWatchService = Mockito.mock(WatchService.class);
-        notifierSpy = Mockito.spy(new FileChangeNotifier());
-        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
-        notifierSpy.setWatchService(mockWatchService);
-
-        testProperties = new Properties();
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        notifierSpy.close();
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidFile() throws Exception {
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_validFile() throws Exception {
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidPollingPeriod() throws Exception {
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_useDefaultPolling() throws Exception {
-        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
-        notifierSpy.initialize(testProperties);
-    }
-
-
-    @Test
-    public void testNotifyListeners() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(testListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        notifierSpy.notifyListeners();
-
-        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testRegisterListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
-        wasRegistered = notifierSpy.registerListener(secondListener);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
-
-    }
-
-    @Test
-    public void testRegisterDuplicateListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
-    }
-
-    /* Verify handleChange events */
-    @Test
-    public void testTargetChangedNoModification() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case the WatchKey is null because there were no events found
-        establishMockEnvironmentForChangeTests(testListener, null);
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
-        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        notifierSpy.targetChanged();
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
-        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        // Invoke the method of interest
-        notifierSpy.run();
-
-        verify(mockWatchService, Mockito.atLeastOnce()).poll();
-        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    /* Helper methods to establish mock environment */
-    private WatchKey createMockWatchKeyForPath(String configFilePath) {
-        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
-        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
-        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
-        when(mockWatchKey.reset()).thenReturn(true);
-
-        final Iterator mockIterator = Mockito.mock(Iterator.class);
-        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
-        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
-        when(mockIterator.hasNext()).thenReturn(true, false);
-        when(mockIterator.next()).thenReturn(mockWatchEvent);
-
-        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
-        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
-        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
-        return mockWatchKey;
-    }
-
-    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
-        final boolean wasRegistered = notifierSpy.registerListener(listener);
-
-        // Establish the file mock and its parent directory
-        final Path mockConfigFilePath = Mockito.mock(Path.class);
-        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
-        // When getting the parent of the file, get the directory
-        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        when(mockWatchService.poll()).thenReturn(watchKey);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
deleted file mode 100644
index 75b44e3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
-
-    @BeforeClass
-    public static void setUp() throws InterruptedException, MalformedURLException {
-        Properties properties = new Properties();
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
deleted file mode 100644
index 908e693..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
-
-
-    @BeforeClass
-    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
-        Properties properties = new Properties();
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        SSLContext sslContext = SSLContext.getInstance("TLS");
-        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
-
-        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
-
-        sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
-        client.setSslSocketFactory(sslContext.getSocketFactory());
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-
-    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
-        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-
-        char[] password = "localtest".toCharArray();
-
-        java.io.FileInputStream fis = null;
-        try {
-            fis = new java.io.FileInputStream(path);
-            ks.load(fis, password);
-        } finally {
-            if (fis != null) {
-                fis.close();
-            }
-        }
-        return ks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
new file mode 100644
index 0000000..145c2fe
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeNotifier {
+
+    private static final String CONFIG_FILENAME = "config.yml";
+    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+    private FileChangeNotifier notifierSpy;
+    private WatchService mockWatchService;
+    private Properties testProperties;
+
+    @Before
+    public void setUp() throws Exception {
+        mockWatchService = Mockito.mock(WatchService.class);
+        notifierSpy = Mockito.spy(new FileChangeNotifier());
+        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
+        notifierSpy.setWatchService(mockWatchService);
+
+        testProperties = new Properties();
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        notifierSpy.close();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidFile() throws Exception {
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_validFile() throws Exception {
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidPollingPeriod() throws Exception {
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_useDefaultPolling() throws Exception {
+        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
+        notifierSpy.initialize(testProperties);
+    }
+
+
+    @Test
+    public void testNotifyListeners() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(testListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        notifierSpy.notifyListeners();
+
+        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testRegisterListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+        wasRegistered = notifierSpy.registerListener(secondListener);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
+
+    }
+
+    @Test
+    public void testRegisterDuplicateListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
+    }
+
+    /* Verify handleChange events */
+    @Test
+    public void testTargetChangedNoModification() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case the WatchKey is null because there were no events found
+        establishMockEnvironmentForChangeTests(testListener, null);
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        notifierSpy.targetChanged();
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        // Invoke the method of interest
+        notifierSpy.run();
+
+        verify(mockWatchService, Mockito.atLeastOnce()).poll();
+        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    /* Helper methods to establish mock environment */
+    private WatchKey createMockWatchKeyForPath(String configFilePath) {
+        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+        when(mockWatchKey.reset()).thenReturn(true);
+
+        final Iterator mockIterator = Mockito.mock(Iterator.class);
+        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+        return mockWatchKey;
+    }
+
+    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
+        final boolean wasRegistered = notifierSpy.registerListener(listener);
+
+        // Establish the file mock and its parent directory
+        final Path mockConfigFilePath = Mockito.mock(Path.class);
+        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+        // When getting the parent of the file, get the directory
+        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        when(mockWatchService.poll()).thenReturn(watchKey);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
new file mode 100644
index 0000000..1cd37fd
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
+
+    @BeforeClass
+    public static void setUp() throws InterruptedException, MalformedURLException {
+        Properties properties = new Properties();
+        restChangeNotifier = new RestChangeNotifier();
+        restChangeNotifier.initialize(properties);
+        restChangeNotifier.registerListener(mockChangeListener);
+        restChangeNotifier.start();
+
+        client = new OkHttpClient();
+
+        url = restChangeNotifier.getURI().toURL().toString();
+        Thread.sleep(1000);
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeNotifier.close();
+        client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
new file mode 100644
index 0000000..6073a6f
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
+
+
+    @BeforeClass
+    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+        Properties properties = new Properties();
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
+        restChangeNotifier = new RestChangeNotifier();
+        restChangeNotifier.initialize(properties);
+        restChangeNotifier.registerListener(mockChangeListener);
+        restChangeNotifier.start();
+
+        client = new OkHttpClient();
+
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
+
+        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
+
+        sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
+        client.setSslSocketFactory(sslContext.getSocketFactory());
+
+        url = restChangeNotifier.getURI().toURL().toString();
+        Thread.sleep(1000);
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeNotifier.close();
+        client = null;
+    }
+
+    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+        char[] password = "localtest".toCharArray();
+
+        java.io.FileInputStream fis = null;
+        try {
+            fis = new java.io.FileInputStream(path);
+            ks.load(fis, password);
+        } finally {
+            if (fis != null) {
+                fis.close();
+            }
+        }
+        return ks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
new file mode 100644
index 0000000..eae5872
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class MockChangeListener implements ConfigurationChangeListener {
+    String confFile;
+
+    @Override
+    public void handleChange(InputStream inputStream) {
+        try {
+            confFile = IOUtils.toString(inputStream, "UTF-8");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getDescriptor() {
+        return "MockChangeListener";
+    }
+
+    public String getConfFile() {
+        return confFile;
+    }
+
+    public void setConfFile(String confFile) {
+        this.confFile = confFile;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
new file mode 100644
index 0000000..78f6cd5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import com.squareup.okhttp.Headers;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class TestRestChangeNotifierCommon {
+
+    public static OkHttpClient client;
+    public static RestChangeNotifier restChangeNotifier;
+    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
+    public static String url;
+    public static MockChangeListener mockChangeListener = new MockChangeListener();
+
+    @Test
+    public void testGet() throws Exception {
+        assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+        Request request = new Request.Builder()
+                .url(url)
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
+    }
+
+    @Test
+    public void testFileUpload() throws Exception {
+        assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+        File file = new File("src/test/resources/testUploadFile.txt");
+        assertTrue(file.exists());
+        assertTrue(file.canRead());
+
+        Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
+                .addHeader("charset","UTF-8")
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+        assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
deleted file mode 100644
index 6843889..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class MockChangeListener implements ConfigurationChangeListener {
-    String confFile;
-
-    @Override
-    public void handleChange(InputStream inputStream) {
-        try {
-            confFile = IOUtils.toString(inputStream, "UTF-8");
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public String getConfFile() {
-        return confFile;
-    }
-
-    public void setConfFile(String confFile) {
-        this.confFile = confFile;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
deleted file mode 100644
index b3c4f54..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import com.squareup.okhttp.Headers;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public abstract class TestRestChangeNotifierCommon {
-
-    public static OkHttpClient client;
-    public static RestChangeNotifier restChangeNotifier;
-    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
-    public static String url;
-    public static MockChangeListener mockChangeListener = new MockChangeListener();
-
-    @Test
-    public void testGet() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
-        Request request = new Request.Builder()
-                .url(url)
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
-    }
-
-    @Test
-    public void testFileUpload() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
-        File file = new File("src/test/resources/testUploadFile.txt");
-        assertTrue(file.exists());
-        assertTrue(file.canRead());
-
-        Request request = new Request.Builder()
-                .url(url)
-                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
-                .addHeader("charset","UTF-8")
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals(RestChangeNotifier.POST_TEXT, response.body().string());
-
-        assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index 1a7f261..d0a7d71 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,6 +64,24 @@ public class TestConfigTransformer {
         flowXml.deleteOnExit();
     }
 
+    @Test
+    public void doesTransformOnDefaultFile() throws Exception {
+
+        ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
+        File nifiPropertiesFile = new File("./target/nifi.properties");
+
+        assertTrue(nifiPropertiesFile.exists());
+        assertTrue(nifiPropertiesFile.canRead());
+
+        nifiPropertiesFile.deleteOnExit();
+
+        File flowXml = new File("./target/flow.xml.gz");
+        assertTrue(flowXml.exists());
+        assertTrue(flowXml.canRead());
+
+        flowXml.deleteOnExit();
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void handleTransformInvalidFile() throws Exception {
 
@@ -70,4 +89,12 @@ public class TestConfigTransformer {
 
         Assert.fail("Invalid configuration file was not detected.");
     }
+
+    @Test(expected = ConfigurationChangeException.class)
+    public void handleTransformEmptyFile() throws Exception {
+
+        ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
+
+        Assert.fail("Invalid configuration file was not detected.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/config-empty.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-empty.yml b/minifi-bootstrap/src/test/resources/config-empty.yml
new file mode 100644
index 0000000..fbbbeb9
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-empty.yml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+    name: MiNiFi Flow
+    comment:
\ No newline at end of file