You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/25 17:54:49 UTC

[3/3] nifi-minifi git commit: MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

This closes #15


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/66dbda90
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/66dbda90
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/66dbda90

Branch: refs/heads/master
Commit: 66dbda90c904a679337633a3631ea09554887ec7
Parents: 0c04fbb
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Thu Apr 21 16:50:19 2016 -0400
Committer: Joseph Percivall <jo...@yahoo.com>
Committed: Mon Apr 25 11:53:32 2016 -0400

----------------------------------------------------------------------
 minifi-assembly/pom.xml                         |   2 +-
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 270 +++++++----
 .../nifi/minifi/bootstrap/ShutdownHook.java     |   2 +
 .../ConfigurationChangeException.java           |  42 ++
 .../ConfigurationChangeListener.java            |   6 +-
 .../ConfigurationChangeNotifier.java            |   4 +-
 .../configuration/FileChangeNotifier.java       | 183 --------
 .../configuration/ListenerHandleResult.java     |  55 +++
 .../configuration/RestChangeNotifier.java       | 259 -----------
 .../notifiers/FileChangeNotifier.java           | 202 ++++++++
 .../notifiers/RestChangeNotifier.java           | 289 ++++++++++++
 .../bootstrap/util/ConfigTransformer.java       | 459 +++++++++++--------
 .../configuration/TestFileChangeNotifier.java   | 206 ---------
 .../configuration/TestRestChangeNotifier.java   |  51 ---
 .../TestRestChangeNotifierSSL.java              |  96 ----
 .../notifiers/TestFileChangeNotifier.java       | 208 +++++++++
 .../notifiers/TestRestChangeNotifier.java       |  51 +++
 .../notifiers/TestRestChangeNotifierSSL.java    |  96 ++++
 .../notifiers/util/MockChangeListener.java      |  51 +++
 .../util/TestRestChangeNotifierCommon.java      |  89 ++++
 .../configuration/util/MockChangeListener.java  |  46 --
 .../util/TestRestChangeNotifierCommon.java      |  89 ----
 .../bootstrap/util/TestConfigTransformer.java   |  27 ++
 .../src/test/resources/config-empty.yml         |  18 +
 minifi-bootstrap/src/test/resources/default.yml | 101 ++++
 .../src/main/resources/conf/bootstrap.conf      |   4 +-
 26 files changed, 1699 insertions(+), 1207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml
index 7a0e6b5..7c5266d 100644
--- a/minifi-assembly/pom.xml
+++ b/minifi-assembly/pom.xml
@@ -261,7 +261,7 @@ limitations under the License.
         <!-- nifi.properties: web properties -->
         <nifi.web.war.directory>./lib</nifi.web.war.directory>
         <nifi.web.http.host />
-        <nifi.web.http.port>8080</nifi.web.http.port>
+        <nifi.web.http.port>8081</nifi.web.http.port>
         <nifi.web.https.host />
         <nifi.web.https.port />
         <nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 98d06f3..82b583f 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -51,10 +52,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
@@ -64,6 +67,8 @@ import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
 /**
  * <p>
  * The class which bootstraps Apache MiNiFi. This class looks for the
@@ -116,7 +121,6 @@ public class RunMiNiFi {
     private final Lock startedLock = new ReentrantLock();
     private final Lock lock = new ReentrantLock();
     private final Condition startupCondition = lock.newCondition();
-
     private final File bootstrapConfigFile;
 
     // used for logging initial info; these will be logged to console by default when the app is started
@@ -130,7 +134,10 @@ public class RunMiNiFi {
     private volatile int gracefulShutdownSeconds;
 
     private Set<ConfigurationChangeNotifier> changeNotifiers;
-    private ConfigurationChangeListener changeListener;
+    private MiNiFiConfigurationChangeListener changeListener;
+
+    // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
+    private AtomicBoolean reloading = new AtomicBoolean(false);
 
     private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
 
@@ -273,12 +280,21 @@ public class RunMiNiFi {
         final File confDir = bootstrapConfigFile.getParentFile();
         final File nifiHome = confDir.getParentFile();
         final File bin = new File(nifiHome, "bin");
-        final File lockFile = new File(bin, "minifi.reload.lock");
+        final File reloadFile = new File(bin, "minifi.reload.lock");
 
-        logger.debug("Reload File: {}", lockFile);
-        return lockFile;
+        logger.debug("Reload File: {}", reloadFile);
+        return reloadFile;
+    }
+
+    public File getSwapFile(final Logger logger) {
+        final File confDir = bootstrapConfigFile.getParentFile();
+        final File swapFile = new File(confDir, "swap.yml");
+
+        logger.debug("Swap File: {}", swapFile);
+        return swapFile;
     }
 
+
     private Properties loadProperties(final Logger logger) throws IOException {
         final Properties props = new Properties();
         final File statusFile = getStatusFile(logger);
@@ -663,7 +679,8 @@ public class RunMiNiFi {
                         }
                     }
 
-                    logger.info("MiNiFi has finished shutting down.");
+                    reloading.set(true);
+                    logger.info("MiNiFi has finished shutting down and will be reloaded.");
                 }
             } else {
                 logger.error("When sending RELOAD command to MiNiFi, got unexpected response {}", response);
@@ -1035,6 +1052,15 @@ public class RunMiNiFi {
     @SuppressWarnings({"rawtypes", "unchecked"})
     public void start() throws IOException, InterruptedException {
 
+        final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
+        final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
+        try {
+            performTransformation(new FileInputStream(configFile), confDir);
+        } catch (ConfigurationChangeException e) {
+            defaultLogger.error("The config file is malformed, unable to start.", e);
+            return;
+        }
+
         Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
         if (tuple == null) {
             cmdLogger.info("Start method returned null, ending start command.");
@@ -1048,76 +1074,115 @@ public class RunMiNiFi {
         ProcessBuilder builder = tuple.getKey();
         Process process = tuple.getValue();
 
-        while (true) {
-            final boolean alive = isAlive(process);
+        try {
+            while (true) {
+                final boolean alive = isAlive(process);
 
-            if (alive) {
-                try {
-                    Thread.sleep(1000L);
-                } catch (final InterruptedException ie) {
-                }
-            } else {
-                final Runtime runtime = Runtime.getRuntime();
-                try {
-                    runtime.removeShutdownHook(shutdownHook);
-                } catch (final IllegalStateException ise) {
-                    // happens when already shutting down
-                }
+                if (alive) {
+                    try {
+                        Thread.sleep(1000L);
 
-                String now = sdf.format(System.currentTimeMillis());
-                if (autoRestartNiFi) {
-                    final File statusFile = getStatusFile(defaultLogger);
-                    if (!statusFile.exists()) {
-                        defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
-                        return;
-                    }
+                        if (reloading.get() && getNifiStarted()) {
+                            final File swapConfigFile = getSwapFile(defaultLogger);
+                            if (swapConfigFile.exists()) {
+                                defaultLogger.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration.");
 
-                    final File lockFile = getLockFile(defaultLogger);
-                    if (lockFile.exists()) {
-                        defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
-                        return;
-                    }
+                                if (swapConfigFile.delete()) {
+                                    defaultLogger.info("Swap file was successfully deleted.");
+                                } else {
+                                    defaultLogger.info("Swap file was not deleted.");
+                                }
+                            }
 
-                    final File reloadFile = getReloadFile(defaultLogger);
-                    if (reloadFile.exists()) {
-                        defaultLogger.info("Currently reloading configuration.  Will not restart MiNiFi.");
-                        Thread.sleep(5000L);
-                        continue;
-                    }
+                            reloading.set(false);
+                        }
 
-                    final boolean previouslyStarted = getNifiStarted();
-                    if (!previouslyStarted) {
-                        defaultLogger.info("MiNiFi never started. Will not restart MiNiFi");
-                        return;
-                    } else {
-                        setNiFiStarted(false);
+                    } catch (final InterruptedException ie) {
+                    }
+                } else {
+                    final Runtime runtime = Runtime.getRuntime();
+                    try {
+                        runtime.removeShutdownHook(shutdownHook);
+                    } catch (final IllegalStateException ise) {
+                        // happens when already shutting down
                     }
 
-                    process = builder.start();
-                    handleLogging(process);
+                    if (autoRestartNiFi) {
+                        final File statusFile = getStatusFile(defaultLogger);
+                        if (!statusFile.exists()) {
+                            defaultLogger.info("Status File no longer exists. Will not restart MiNiFi");
+                            return;
+                        }
 
-                    Long pid = getPid(process, defaultLogger);
-                    if (pid != null) {
-                        nifiPid = pid;
-                        final Properties nifiProps = new Properties();
-                        nifiProps.setProperty("pid", String.valueOf(nifiPid));
-                        saveProperties(nifiProps, defaultLogger);
-                    }
+                        final File lockFile = getLockFile(defaultLogger);
+                        if (lockFile.exists()) {
+                            defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
+                            return;
+                        }
 
-                    shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
-                    runtime.addShutdownHook(shutdownHook);
+                        final File reloadFile = getReloadFile(defaultLogger);
+                        if (reloadFile.exists()) {
+                            defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi.");
+                            Thread.sleep(5000L);
+                            continue;
+                        }
+
+                        final boolean previouslyStarted = getNifiStarted();
+                        if (!previouslyStarted) {
+                            final File swapConfigFile = getSwapFile(defaultLogger);
+                            if (swapConfigFile.exists()) {
+                                defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+                                try {
+                                    performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                } catch (ConfigurationChangeException e) {
+                                    defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+                                    return;
+                                }
 
-                    final boolean started = waitForStart();
+                                Files.copy(swapConfigFile.toPath(), Paths.get(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+                                defaultLogger.info("Replacing config file with swap file and deleting swap file");
+                                if (!swapConfigFile.delete()) {
+                                    defaultLogger.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+                                }
+                                reloading.set(false);
+                            } else {
+                                defaultLogger.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi");
+                                return;
+                            }
+                        } else {
+                            setNiFiStarted(false);
+                        }
 
-                    if (started) {
-                        defaultLogger.info("Successfully started Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+                        process = builder.start();
+                        handleLogging(process);
+
+                        Long pid = getPid(process, defaultLogger);
+                        if (pid != null) {
+                            nifiPid = pid;
+                            final Properties nifiProps = new Properties();
+                            nifiProps.setProperty("pid", String.valueOf(nifiPid));
+                            saveProperties(nifiProps, defaultLogger);
+                        }
+
+                        shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
+                        runtime.addShutdownHook(shutdownHook);
+
+                        final boolean started = waitForStart();
+
+                        if (started) {
+                            defaultLogger.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
+                        } else {
+                            defaultLogger.error("Apache MiNiFi does not appear to have started");
+                        }
                     } else {
-                        defaultLogger.error("Apache MiNiFi does not appear to have started");
+                        return;
                     }
-                } else {
-                    return;
                 }
             }
+        } finally {
+            shutdownChangeNotifiers();
         }
     }
 
@@ -1255,7 +1320,7 @@ public class RunMiNiFi {
             defaultLogger.warn("Apache MiNiFi has started but failed to persist MiNiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe});
         }
 
-        defaultLogger.info("Apache MiNiFi now running and listening for Bootstrap requests on port {}", port);
+        defaultLogger.info("The thread to run Apache MiNiFi is now running and listening for Bootstrap requests on port {}", port);
     }
 
     int getNiFiCommandControlPort() {
@@ -1328,7 +1393,7 @@ public class RunMiNiFi {
         }
 
         @Override
-        public void handleChange(InputStream configInputStream) {
+        public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
             logger.info("Received notification of a change");
             try {
                 final Properties bootstrapProperties = runner.getBootstrapProperties();
@@ -1346,26 +1411,51 @@ public class RunMiNiFi {
                 final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
                 newConfigBais.mark(-1);
 
-                logger.info("Persisting changes to {}", configFile.getAbsolutePath());
-                saveFile(newConfigBais, configFile);
-
-                // Reset the input stream to provide to the transformer
-                newConfigBais.reset();
-
-                final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                logger.info("Performing transformation for input and saving outputs to {}", configFile);
-                performTransformation(newConfigBais, confDir);
-
-                logger.info("Reloading instance with new configuration");
-                restartInstance();
+                final File swapConfigFile = runner.getSwapFile(logger);
+                logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
+                Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
 
+                try {
+                    logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+                    saveFile(newConfigBais, configFile);
+
+                     try {
+                         // Reset the input stream to provide to the transformer
+                         newConfigBais.reset();
+
+                         final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+                         logger.info("Performing transformation for input and saving outputs to {}", confDir);
+                         performTransformation(newConfigBais, confDir);
+
+                         logger.info("Reloading instance with new configuration");
+                         restartInstance();
+                     } catch (Exception e){
+                         logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+                         Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+                         throw e;
+                     }
+                } catch (Exception e){
+                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+                    if(!swapConfigFile.delete()){
+                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                    }
+                    throw e;
+                }
+            } catch (ConfigurationChangeException e){
+                logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
+                throw e;
             } catch (IOException ioe) {
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
-                throw new IllegalStateException("Unable to perform reload of received configuration change", ioe);
+                throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
             }
         }
 
-        private void saveFile(final InputStream configInputStream, File configFile) {
+        @Override
+        public String getDescriptor() {
+            return "MiNiFiConfigurationChangeListener";
+        }
+
+        private void saveFile(final InputStream configInputStream, File configFile) throws IOException {
             try {
                 try (final FileOutputStream configFileOutputStream = new FileOutputStream(configFile)) {
                     byte[] copyArray = new byte[1024];
@@ -1375,29 +1465,31 @@ public class RunMiNiFi {
                     }
                 }
             } catch (IOException ioe) {
-                throw new IllegalStateException("Unable to save updated configuration to the configured config file location", ioe);
+                throw new IOException("Unable to save updated configuration to the configured config file location", ioe);
             }
         }
 
-        private void performTransformation(InputStream configIs, String configDestinationPath) {
-            try {
-                ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
-            } catch (Exception e) {
-                logger.error("Unable to successfully transform the provided configuration", e);
-                throw new IllegalStateException("Unable to successfully transform the provided configuration", e);
-            }
-        }
 
-        private void restartInstance() {
-            logger.info("Restarting MiNiFi with new configuration");
+
+        private void restartInstance() throws IOException {
             try {
                 runner.reload();
             } catch (IOException e) {
-                throw new IllegalStateException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+                throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
             }
         }
     }
 
+    private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+        try {
+            ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+        } catch (ConfigurationChangeException e){
+            throw e;
+        } catch (Exception e) {
+            throw new IOException("Unable to successfully transform the provided configuration", e);
+        }
+    }
+
     private static class Status {
 
         private final Integer port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index 13f0d16..ad3a2df 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -101,5 +101,7 @@ public class ShutdownHook extends Thread {
         if (!statusFile.delete()) {
             System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
         }
+
+        System.out.println("MiNiFi is done shutting down");
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
new file mode 100644
index 0000000..04bbb02
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+/**
+ * Exception to indicate there was a problem handling a change to the configuration
+ */
+
+public class ConfigurationChangeException extends Exception {
+
+    public ConfigurationChangeException() {
+        super();
+    }
+
+    public ConfigurationChangeException(String message) {
+        super(message);
+    }
+
+    public ConfigurationChangeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ConfigurationChangeException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 7d9183a..756b051 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -29,6 +29,10 @@ public interface ConfigurationChangeListener {
      *
      * @param is stream of the detected content received from the change notifier
      */
-    void handleChange(InputStream is);
+    void handleChange(InputStream is) throws ConfigurationChangeException;
 
+    /**
+     * Returns a succinct string identifying this particular listener
+     */
+    String getDescriptor();
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 7ad32f1..745ce6c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.minifi.bootstrap.configuration;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.Set;
 
@@ -52,6 +53,5 @@ public interface ConfigurationChangeNotifier extends Closeable {
     /**
      * Provide the mechanism by which listeners are notified
      */
-    void notifyListeners();
-
+    Collection<ListenerHandleResult> notifyListeners();
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
deleted file mode 100644
index d3f51f7..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
- * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
- * ensures continuity of monitoring.
- */
-public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
-
-    private Path configFile;
-    private WatchService watchService;
-    private long pollingSeconds;
-
-    private ScheduledExecutorService executorService;
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-
-    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
-
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
-    }
-
-    @Override
-    public void notifyListeners() {
-        final File fileToRead = configFile.toFile();
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
-                listener.handleChange(fis);
-            } catch (IOException ex) {
-                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
-            }
-        }
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return this.configurationChangeListeners.add(listener);
-    }
-
-    protected boolean targetChanged() {
-        boolean targetChanged = false;
-
-        final WatchKey watchKey = this.watchService.poll();
-
-        if (watchKey == null) {
-            return targetChanged;
-        }
-
-        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
-            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
-
-            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
-            final Path changedFile = pathEvent.context();
-
-            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
-            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
-        }
-
-        // After completing inspection, reset for detection of subsequent change events
-        boolean valid = watchKey.reset();
-        if (!valid) {
-            throw new IllegalStateException("Unable to reinitialize file system watcher.");
-        }
-
-        return targetChanged;
-    }
-
-    protected static WatchService initializeWatcher(Path filePath) {
-        try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
-
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
-        }
-    }
-
-    @Override
-    public void run() {
-        if (targetChanged()) {
-            notifyListeners();
-        }
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
-        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
-
-        if (rawPath == null || rawPath.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
-        }
-
-        try {
-            setConfigFile(Paths.get(rawPath));
-            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
-            setWatchService(initializeWatcher(configFile));
-        } catch (Exception e) {
-            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
-        }
-    }
-
-    protected void setConfigFile(Path configFile) {
-        this.configFile = configFile;
-    }
-
-    protected void setWatchService(WatchService watchService) {
-        this.watchService = watchService;
-    }
-
-    protected void setPollingPeriod(long duration, TimeUnit unit) {
-        if (duration < 0) {
-            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
-        }
-        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
-    }
-
-    @Override
-    public void start() {
-        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread t = Executors.defaultThreadFactory().newThread(r);
-                t.setName("File Change Notifier Thread");
-                t.setDaemon(true);
-                return t;
-            }
-        });
-        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
-    }
-
-    @Override
-    public void close() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
new file mode 100644
index 0000000..8ac4cea
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+public class ListenerHandleResult {
+
+    private final ConfigurationChangeListener configurationChangeListener;
+    private final Exception failureCause;
+
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+        this.configurationChangeListener = configurationChangeListener;
+        failureCause = null;
+    }
+
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+        this.configurationChangeListener = configurationChangeListener;
+        this.failureCause = failureCause;
+    }
+
+    public boolean succeeded(){
+        return failureCause == null;
+    }
+
+    public String getDescriptor(){
+        return configurationChangeListener.getDescriptor();
+    }
+
+    public Exception getFailureCause(){
+        return failureCause;
+    }
+
+    @Override
+    public String toString() {
+        if(failureCause == null){
+            return getDescriptor() + " successfully handled the configuration change";
+        } else {
+            return getDescriptor() + " FAILED to handle the configuration change due to: '"  + failureCause.getMessage() + "'";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
deleted file mode 100644
index 5807f89..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-
-public class RestChangeNotifier implements ConfigurationChangeNotifier {
-
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-    private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
-    private String configFile = null;
-    private final Server jetty;
-    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
-            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
-            "Send a POST http request to '/' to upload the file.";
-    public static final String POST_TEXT ="Configuration received, notifying listeners.\n";
-    public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
-
-
-    public static final String POST = "POST";
-    public static final String GET = "GET";
-
-    public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
-    public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
-    public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
-    public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
-    public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
-    public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
-    public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
-    public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
-    public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
-
-    public RestChangeNotifier(){
-        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
-        queuedThreadPool.setDaemon(true);
-        jetty = new Server(queuedThreadPool);
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        logger.info("Initializing");
-
-        // create the secure connector if keystore location is specified
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            createSecureConnector(properties);
-        } else {
-            // create the unsecure connector otherwise
-            createConnector(properties);
-        }
-
-        HandlerCollection handlerCollection = new HandlerCollection(true);
-        handlerCollection.addHandler(new JettyHandler());
-        jetty.setHandler(handlerCollection);
-    }
-
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return configurationChangeListeners;
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return configurationChangeListeners.add(listener);
-    }
-
-    @Override
-    public void notifyListeners() {
-        if (configFile == null){
-            throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
-        }
-
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes());) {
-                listener.handleChange(fis);
-            } catch (IOException ex) {
-                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
-            }
-        }
-
-        configFile = null;
-    }
-
-    @Override
-    public void start(){
-        try {
-            jetty.start();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-
-    @Override
-    public void close() throws IOException {
-        logger.warn("Shutting down the jetty server");
-        try {
-            jetty.stop();
-            jetty.destroy();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        logger.warn("Done shutting down the jetty server");
-    }
-
-    public URI getURI(){
-        return jetty.getURI();
-    }
-
-    public int getPort(){
-        if (!jetty.isStarted()) {
-            throw new IllegalStateException("Jetty server not started");
-        }
-        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
-    }
-
-    public String getConfigString(){
-        return configFile;
-    }
-
-    private void setConfigFile(String configFile){
-        this.configFile = configFile;
-    }
-
-    private void createConnector(Properties properties) {
-        final ServerConnector http = new ServerConnector(jetty);
-
-        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
-        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed or distant environments may have significant delays when executing.
-        http.setIdleTimeout(30000L);
-        jetty.addConnector(http);
-
-        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
-    }
-
-    private void createSecureConnector(Properties properties) {
-        SslContextFactory ssl = new SslContextFactory();
-
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
-            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
-            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
-        }
-
-        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
-            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
-            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
-            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
-            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
-        }
-
-        // build the connector
-        final ServerConnector https = new ServerConnector(jetty, ssl);
-
-        // set host and port
-        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
-        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed environments may have significant delays when executing.
-        https.setIdleTimeout(30000L);
-
-        // add the connector
-        jetty.addConnector(https);
-
-        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
-    }
-
-
-    public class JettyHandler extends AbstractHandler {
-
-        @Override
-        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
-                throws IOException, ServletException {
-
-            logRequest(request);
-
-            baseRequest.setHandled(true);
-
-            if(POST.equals(request.getMethod())) {
-                final StringBuilder configBuilder = new StringBuilder();
-                BufferedReader reader = request.getReader();
-                if(reader != null && reader.ready()){
-                    String line;
-                    while ((line = reader.readLine()) != null) {
-                        configBuilder.append(line);
-                        configBuilder.append(System.getProperty("line.separator"));
-                    }
-                }
-                setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
-                notifyListeners();
-                writeOutput(response, POST_TEXT, 200);
-            } else if(GET.equals(request.getMethod())) {
-                writeOutput(response, GET_TEXT, 200);
-            } else {
-                writeOutput(response, OTHER_TEXT, 404);
-            }
-        }
-
-        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
-            response.setStatus(responseCode);
-            response.setContentType("text/plain");
-            response.setContentLength(responseText.length());
-            try (PrintWriter writer = response.getWriter()) {
-                writer.print(responseText);
-                writer.flush();
-            }
-        }
-
-        private void logRequest(HttpServletRequest request){
-            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
-            logger.info("request method = " + request.getMethod());
-            logger.info("request url = " + request.getRequestURL());
-            logger.info("context path = " + request.getContextPath());
-            logger.info("request content type = " + request.getContentType());
-            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
new file mode 100644
index 0000000..faba2f0
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
+
+    private Path configFile;
+    private WatchService watchService;
+    private long pollingSeconds;
+
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeNotifier.class);
+    private ScheduledExecutorService executorService;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+
+    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
+    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
+
+    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+    @Override
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return Collections.unmodifiableSet(configurationChangeListeners);
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners() {
+        logger.info("Notifying Listeners of a change");
+        final File fileToRead = configFile.toFile();
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
+                listener.handleChange(fis);
+                result = new ListenerHandleResult(listener);
+            } catch (IOException | ConfigurationChangeException ex) {
+                result =  new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+        return listenerHandleResults;
+    }
+
+    @Override
+    public boolean registerListener(ConfigurationChangeListener listener) {
+        return this.configurationChangeListeners.add(listener);
+    }
+
+    protected boolean targetChanged() {
+        boolean targetChanged = false;
+
+        final WatchKey watchKey = this.watchService.poll();
+
+        if (watchKey == null) {
+            return targetChanged;
+        }
+
+        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+            final Path changedFile = pathEvent.context();
+
+            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
+        }
+
+        // After completing inspection, reset for detection of subsequent change events
+        boolean valid = watchKey.reset();
+        if (!valid) {
+            throw new IllegalStateException("Unable to reinitialize file system watcher.");
+        }
+
+        return targetChanged;
+    }
+
+    protected static WatchService initializeWatcher(Path filePath) {
+        try {
+            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+            final Path watchDirectory = filePath.getParent();
+            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+            return fsWatcher;
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        }
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change");
+        if (targetChanged()) {
+            notifyListeners();
+        }
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+        if (rawPath == null || rawPath.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+        }
+
+        try {
+            setConfigFile(Paths.get(rawPath));
+            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+            setWatchService(initializeWatcher(configFile));
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+        }
+    }
+
+    protected void setConfigFile(Path configFile) {
+        this.configFile = configFile;
+    }
+
+    protected void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    protected void setPollingPeriod(long duration, TimeUnit unit) {
+        if (duration < 0) {
+            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+        }
+        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("File Change Notifier Thread");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void close() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
new file mode 100644
index 0000000..777214f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+
+public class RestChangeNotifier implements ConfigurationChangeNotifier {
+
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+    private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
+    private String configFile = null;
+    private final Server jetty;
+    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
+            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
+            "Send a POST http request to '/' to upload the file.";
+    public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
+
+
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
+    public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
+    public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
+    public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
+
+    public RestChangeNotifier(){
+        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+        queuedThreadPool.setDaemon(true);
+        jetty = new Server(queuedThreadPool);
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+        logger.info("Initializing");
+
+        // create the secure connector if keystore location is specified
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            createSecureConnector(properties);
+        } else {
+            // create the unsecure connector otherwise
+            createConnector(properties);
+        }
+
+        HandlerCollection handlerCollection = new HandlerCollection(true);
+        handlerCollection.addHandler(new JettyHandler());
+        jetty.setHandler(handlerCollection);
+    }
+
+    @Override
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return configurationChangeListeners;
+    }
+
+    @Override
+    public boolean registerListener(ConfigurationChangeListener listener) {
+        return configurationChangeListeners.add(listener);
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners() {
+        if (configFile == null){
+            throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
+        }
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes())) {
+                listener.handleChange(fis);
+                result = new ListenerHandleResult(listener);
+            } catch (IOException | ConfigurationChangeException ex) {
+                result = new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+
+        configFile = null;
+        return listenerHandleResults;
+    }
+
+    @Override
+    public void start(){
+        try {
+            jetty.start();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        logger.warn("Shutting down the jetty server");
+        try {
+            jetty.stop();
+            jetty.destroy();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        logger.warn("Done shutting down the jetty server");
+    }
+
+    public URI getURI(){
+        return jetty.getURI();
+    }
+
+    public int getPort(){
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+    }
+
+    public String getConfigString(){
+        return configFile;
+    }
+
+    private void setConfigFile(String configFile){
+        this.configFile = configFile;
+    }
+
+    private void createConnector(Properties properties) {
+        final ServerConnector http = new ServerConnector(jetty);
+
+        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
+        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed or distant environments may have significant delays when executing.
+        http.setIdleTimeout(30000L);
+        jetty.addConnector(http);
+
+        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
+    }
+
+    private void createSecureConnector(Properties properties) {
+        SslContextFactory ssl = new SslContextFactory();
+
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
+            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
+            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
+        }
+
+        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
+            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
+            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
+            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
+            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
+        }
+
+        // build the connector
+        final ServerConnector https = new ServerConnector(jetty, ssl);
+
+        // set host and port
+        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
+        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed environments may have significant delays when executing.
+        https.setIdleTimeout(30000L);
+
+        // add the connector
+        jetty.addConnector(https);
+
+        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
+    }
+
+
+    public class JettyHandler extends AbstractHandler {
+
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+                throws IOException, ServletException {
+
+            logRequest(request);
+
+            baseRequest.setHandled(true);
+
+            if(POST.equals(request.getMethod())) {
+                final StringBuilder configBuilder = new StringBuilder();
+                BufferedReader reader = request.getReader();
+                if(reader != null && reader.ready()){
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        configBuilder.append(line);
+                        configBuilder.append(System.getProperty("line.separator"));
+                    }
+                }
+                setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
+                Collection<ListenerHandleResult> listenerHandleResults = notifyListeners();
+
+                int statusCode = 200;
+                for (ListenerHandleResult result: listenerHandleResults){
+                    if(!result.succeeded()){
+                        statusCode = 500;
+                        break;
+                    }
+                }
+
+                writeOutput(response, getPostText(listenerHandleResults), statusCode);
+            } else if(GET.equals(request.getMethod())) {
+                writeOutput(response, GET_TEXT, 200);
+            } else {
+                writeOutput(response, OTHER_TEXT, 404);
+            }
+        }
+
+        private String getPostText(Collection<ListenerHandleResult> listenerHandleResults){
+            StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n");
+
+            for (ListenerHandleResult result : listenerHandleResults) {
+                postResult.append(result.toString());
+                postResult.append("\n");
+            }
+
+            return postResult.toString();
+        }
+
+        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
+            response.setStatus(responseCode);
+            response.setContentType("text/plain");
+            response.setContentLength(responseText.length());
+            try (PrintWriter writer = response.getWriter()) {
+                writer.print(responseText);
+                writer.flush();
+            }
+        }
+
+        private void logRequest(HttpServletRequest request){
+            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            logger.info("request method = " + request.getMethod());
+            logger.info("request url = " + request.getRequestURL());
+            logger.info("context path = " + request.getContextPath());
+            logger.info("request content type = " + request.getContentType());
+            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
+        }
+
+    }
+}