You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/18 20:58:49 UTC

[2/2] nifi-minifi git commit: MINIFI-14 Incorporating Listener/Notifier Logic to RunMiNiFi and performing some refactoring of how the FileChangeNotifier is handled.

MINIFI-14 Incorporating Listener/Notifier Logic to RunMiNiFi and performing some refactoring of how the FileChangeNotifier is handled.

Incorporating ConfigTransformer and adjusting the handling of paths to avoid issues where config properties may or may not end with a /

This closes #10


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/3a967be7
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/3a967be7
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/3a967be7

Branch: refs/heads/master
Commit: 3a967be7a6fa6dbaceeff28f4af8c7ce05b77f2c
Parents: 3bbd391
Author: Aldrin Piri <al...@apache.org>
Authored: Wed Apr 13 16:43:31 2016 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Mon Apr 18 14:57:55 2016 -0400

----------------------------------------------------------------------
 .../ConfigurationChangeListener.java            |  34 --
 .../ConfigurationChangeNotifier.java            |  57 ----
 minifi-bootstrap/pom.xml                        |  21 ++
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 321 ++++++++++++++++---
 .../nifi/minifi/bootstrap/ShutdownHook.java     |  12 +
 .../ConfigurationChangeListener.java            |  34 ++
 .../ConfigurationChangeNotifier.java            |  57 ++++
 .../configuration/FileChangeNotifier.java       | 183 +++++++++++
 .../bootstrap/util/ConfigTransformer.java       |  25 +-
 .../configuration/TestFileChangeNotifier.java   | 206 ++++++++++++
 .../bootstrap/util/TestConfigTransformer.java   |  13 +-
 .../src/test/resources/config-invalid.yml       |  16 +
 .../src/main/resources/conf/bootstrap.conf      |  13 +
 .../minifi-framework/minifi-runtime/pom.xml     |  14 -
 .../apache/nifi/minifi/BootstrapListener.java   |  16 +-
 .../configuration/FileChangeNotifier.java       | 173 ----------
 .../configuration/TestFileChangeNotifier.java   | 206 ------------
 .../src/test/resources/config.yml               |   0
 18 files changed, 863 insertions(+), 538 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeListener.java b/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeListener.java
deleted file mode 100644
index 0d09f72..0000000
--- a/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeListener.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.configuration;
-
-import java.io.InputStream;
-
-/**
- * Interface for handling events detected and driven by an associated {@link ConfigurationChangeNotifier} to which the listener
- * has registered via {@link ConfigurationChangeNotifier#registerListener(ConfigurationChangeListener)}.
- */
-public interface ConfigurationChangeListener {
-
-    /**
-     * Provides a mechanism for the implementation to interpret the specified configuration change
-     *
-     * @param is stream of the detected content received from the change notifier
-     */
-    void handleChange(InputStream is);
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeNotifier.java b/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeNotifier.java
deleted file mode 100644
index 30a98c2..0000000
--- a/minifi-api/src/main/java/org/apache/nifi/minifi/configuration/ConfigurationChangeNotifier.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.configuration;
-
-import java.util.Properties;
-import java.util.Set;
-
-public interface ConfigurationChangeNotifier {
-
-
-    /**
-     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
-     *
-     * @param properties from the bootstrap configuration
-     */
-    void initialize(Properties properties);
-
-    /**
-     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
-     */
-    void start();
-
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    Set<ConfigurationChangeListener> getChangeListeners();
-
-    /**
-     * Adds a listener to be notified of configuration changes
-     *
-     * @param listener to be added to the collection
-     * @return true if the listener was added; false if already registered
-     */
-    boolean registerListener(ConfigurationChangeListener listener);
-
-    /**
-     * Provide the mechanism by which listeners are notified
-     */
-    void notifyListeners();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/pom.xml b/minifi-bootstrap/pom.xml
index 613efbe..68bc60a 100644
--- a/minifi-bootstrap/pom.xml
+++ b/minifi-bootstrap/pom.xml
@@ -46,6 +46,10 @@ limitations under the License.
             <artifactId>nifi-expression-language</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.yaml</groupId>
             <artifactId>snakeyaml</artifactId>
             <version>1.17</version>
@@ -57,4 +61,21 @@ limitations under the License.
             <version>0.6.0</version>
         </dependency>
     </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/config.yml</exclude>
+                        <exclude>src/test/resources/config-invalid.yml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 0daa43d..6c45667 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.minifi.bootstrap;
 
-import org.apache.nifi.util.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -42,6 +38,7 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -58,8 +55,16 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- *
  * <p>
  * The class which bootstraps Apache MiNiFi. This class looks for the
  * bootstrap.conf file by looking in the following places (in order):</p>
@@ -71,7 +76,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * <li>./conf/bootstrap.conf, where {@code ./} represents the working
  * directory.</li>
  * </ol>
- *
+ * <p>
  * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException}.
  */
 public class RunMiNiFi {
@@ -80,6 +85,11 @@ public class RunMiNiFi {
     public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
     public static final String DEFAULT_JAVA_CMD = "java";
 
+
+    public static final String CONF_DIR_KEY = "conf.dir";
+
+    public static final String MINIFI_CONFIG_FILE_KEY = "nifi.minifi.config";
+
     public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
     public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
 
@@ -89,9 +99,13 @@ public class RunMiNiFi {
     public static final int STARTUP_WAIT_SECONDS = 60;
 
     public static final String SHUTDOWN_CMD = "SHUTDOWN";
+    public static final String RELOAD_CMD = "RELOAD";
     public static final String PING_CMD = "PING";
     public static final String DUMP_CMD = "DUMP";
 
+    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
+    public static final String NOTIFIER_COMPONENTS_KEY = NOTIFIER_PROPERTY_PREFIX + ".components";
+
     private volatile boolean autoRestartNiFi = true;
     private volatile int ccPort = -1;
     private volatile long nifiPid = -1L;
@@ -115,9 +129,12 @@ public class RunMiNiFi {
     private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
     private volatile int gracefulShutdownSeconds;
 
+    private final Set<ConfigurationChangeNotifier> changeNotifiers;
+    private final ConfigurationChangeListener changeListener;
+
     private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
 
-    public RunMiNiFi(final File bootstrapConfigFile, final boolean verbose) throws IOException {
+    public RunMiNiFi(final File bootstrapConfigFile) throws IOException {
         this.bootstrapConfigFile = bootstrapConfigFile;
 
         loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@@ -129,12 +146,15 @@ public class RunMiNiFi {
                 return t;
             }
         });
+
+        this.changeListener = new MiNiFiConfigurationChangeListener(this, defaultLogger);
+        this.changeNotifiers = initializeNotifiers();
     }
 
     private static void printUsage() {
         System.out.println("Usage:");
         System.out.println();
-        System.out.println("java org.apache.nifi.minifi.bootstrap.RunMiNiFi [<-verbose>] <command> [options]");
+        System.out.println("java org.apache.nifi.minifi.bootstrap.RunMiNiFi <command> [options]");
         System.out.println();
         System.out.println("Valid commands include:");
         System.out.println("");
@@ -147,10 +167,6 @@ public class RunMiNiFi {
         System.out.println();
     }
 
-    private static String[] shift(final String[] orig) {
-        return Arrays.copyOfRange(orig, 1, orig.length);
-    }
-
     public static void main(String[] args) throws IOException, InterruptedException {
         if (args.length < 1 || args.length > 3) {
             printUsage();
@@ -158,11 +174,6 @@ public class RunMiNiFi {
         }
 
         File dumpFile = null;
-        boolean verbose = false;
-        if (args[0].equals("-verbose")) {
-            verbose = true;
-            args = shift(args);
-        }
 
         final String cmd = args[0];
         if (cmd.equals("dump")) {
@@ -188,7 +199,7 @@ public class RunMiNiFi {
         }
 
         final File configFile = getBootstrapConfFile();
-        final RunMiNiFi runMiNiFi = new RunMiNiFi(configFile, verbose);
+        final RunMiNiFi runMiNiFi = new RunMiNiFi(configFile);
 
         switch (cmd.toLowerCase()) {
             case "start":
@@ -261,6 +272,16 @@ public class RunMiNiFi {
         return lockFile;
     }
 
+    public File getReloadFile(final Logger logger) {
+        final File confDir = bootstrapConfigFile.getParentFile();
+        final File nifiHome = confDir.getParentFile();
+        final File bin = new File(nifiHome, "bin");
+        final File lockFile = new File(bin, "minifi.reload.lock");
+
+        logger.debug("Reload File: {}", lockFile);
+        return lockFile;
+    }
+
     private Properties loadProperties(final Logger logger) throws IOException {
         final Properties props = new Properties();
         final File statusFile = getStatusFile(logger);
@@ -375,8 +396,8 @@ public class RunMiNiFi {
             boolean running = false;
             String line;
             try (final InputStream in = proc.getInputStream();
-                final Reader streamReader = new InputStreamReader(in);
-                final BufferedReader reader = new BufferedReader(streamReader)) {
+                 final Reader streamReader = new InputStreamReader(in);
+                 final BufferedReader reader = new BufferedReader(streamReader)) {
 
                 while ((line = reader.readLine()) != null) {
                     if (line.trim().startsWith(pid)) {
@@ -464,7 +485,7 @@ public class RunMiNiFi {
         }
     }
 
-    public void env(){
+    public void env() {
         final Logger logger = cmdLogger;
         final Status status = getStatus(logger);
         if (status.getPid() == null) {
@@ -497,19 +518,19 @@ public class RunMiNiFi {
             return;
         }
 
-        try{
+        try {
             final Method getSystemPropertiesMethod = virtualMachine.getClass().getMethod("getSystemProperties");
 
-            final Properties sysProps = (Properties)getSystemPropertiesMethod.invoke(virtualMachine);
+            final Properties sysProps = (Properties) getSystemPropertiesMethod.invoke(virtualMachine);
             for (Entry<Object, Object> syspropEntry : sysProps.entrySet()) {
-                logger.info(syspropEntry.getKey().toString() + " = " +syspropEntry.getValue().toString());
+                logger.info(syspropEntry.getKey().toString() + " = " + syspropEntry.getValue().toString());
             }
         } catch (Throwable t) {
             throw new RuntimeException(t);
         } finally {
             try {
                 detachMethod.invoke(virtualMachine);
-            } catch (final Exception e){
+            } catch (final Exception e) {
                 logger.warn("Caught exception detaching from process", e);
             }
         }
@@ -567,6 +588,104 @@ public class RunMiNiFi {
         }
     }
 
+    public void reload() throws IOException {
+        final Logger logger = defaultLogger;
+        final Integer port = getCurrentPort(logger);
+        if (port == null) {
+            logger.info("Apache MiNiFi is not currently running");
+            return;
+        }
+
+        // indicate that a reload command is in progress
+        final File reloadLockFile = getReloadFile(logger);
+        if (!reloadLockFile.exists()) {
+            reloadLockFile.createNewFile();
+        }
+
+        final Properties nifiProps = loadProperties(logger);
+        final String secretKey = nifiProps.getProperty("secret.key");
+        final String pid = nifiProps.getProperty("pid");
+
+        try (final Socket socket = new Socket()) {
+            logger.debug("Connecting to MiNiFi instance");
+            socket.setSoTimeout(10000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.debug("Established connection to MiNiFi instance.");
+            socket.setSoTimeout(10000);
+
+            logger.debug("Sending RELOAD Command to port {}", port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((RELOAD_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+            socket.shutdownOutput();
+
+            final InputStream in = socket.getInputStream();
+            int lastChar;
+            final StringBuilder sb = new StringBuilder();
+            while ((lastChar = in.read()) > -1) {
+                sb.append((char) lastChar);
+            }
+            final String response = sb.toString().trim();
+
+            logger.debug("Received response to RELOAD command: {}", response);
+
+            if (RELOAD_CMD.equals(response)) {
+                logger.info("Apache MiNiFi has accepted the Reload Command and is reloading");
+
+                if (pid != null) {
+                    final Properties bootstrapProperties = getBootstrapProperties();
+
+                    String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+                    int gracefulShutdownSeconds;
+                    try {
+                        gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
+                    } catch (final NumberFormatException nfe) {
+                        gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+                    }
+
+                    final long startWait = System.nanoTime();
+                    while (isProcessRunning(pid, logger)) {
+                        logger.info("Waiting for Apache MiNiFi to finish shutting down...");
+                        final long waitNanos = System.nanoTime() - startWait;
+                        final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+                        if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
+                            if (isProcessRunning(pid, logger)) {
+                                logger.warn("MiNiFi has not finished shutting down after {} seconds as part of configuration reload. Killing process.", gracefulShutdownSeconds);
+                                try {
+                                    killProcessTree(pid, logger);
+                                } catch (final IOException ioe) {
+                                    logger.error("Failed to kill Process with PID {}", pid);
+                                }
+                            }
+                            break;
+                        } else {
+                            try {
+                                Thread.sleep(2000L);
+                            } catch (final InterruptedException ie) {
+                            }
+                        }
+                    }
+
+                    logger.info("MiNiFi has finished shutting down.");
+                }
+            } else {
+                logger.error("When sending RELOAD command to MiNiFi, got unexpected response {}", response);
+            }
+        } catch (final IOException ioe) {
+            if (pid == null) {
+                logger.error("Failed to send shutdown command to port {} due to {}. No PID found for the MiNiFi process, so unable to kill process; "
+                    + "the process should be killed manually.", new Object[]{port, ioe.toString()});
+            } else {
+                logger.error("Failed to send shutdown command to port {} due to {}. Will kill the MiNiFi Process with PID {}.", new Object[]{port, ioe.toString(), pid});
+                killProcessTree(pid, logger);
+            }
+        } finally {
+            if (reloadLockFile.exists() && !reloadLockFile.delete()) {
+                logger.error("Failed to delete reload lock file {}; this file should be cleaned up manually", reloadLockFile);
+            }
+        }
+    }
+
     public void stop() throws IOException {
         final Logger logger = cmdLogger;
         final Integer port = getCurrentPort(logger);
@@ -613,10 +732,7 @@ public class RunMiNiFi {
                 logger.info("Apache MiNiFi has accepted the Shutdown Command and is shutting down now");
 
                 if (pid != null) {
-                    final Properties bootstrapProperties = new Properties();
-                    try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
-                        bootstrapProperties.load(fis);
-                    }
+                    final Properties bootstrapProperties = getBootstrapProperties();
 
                     String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
                     int gracefulShutdownSeconds;
@@ -660,9 +776,9 @@ public class RunMiNiFi {
         } catch (final IOException ioe) {
             if (pid == null) {
                 logger.error("Failed to send shutdown command to port {} due to {}. No PID found for the MiNiFi process, so unable to kill process; "
-                    + "the process should be killed manually.", new Object[] {port, ioe.toString()});
+                    + "the process should be killed manually.", new Object[]{port, ioe.toString()});
             } else {
-                logger.error("Failed to send shutdown command to port {} due to {}. Will kill the MiNiFi Process with PID {}.", new Object[] {port, ioe.toString(), pid});
+                logger.error("Failed to send shutdown command to port {} due to {}. Will kill the MiNiFi Process with PID {}.", new Object[]{port, ioe.toString(), pid});
                 killProcessTree(pid, logger);
                 if (statusFile.exists() && !statusFile.delete()) {
                     logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
@@ -675,11 +791,19 @@ public class RunMiNiFi {
         }
     }
 
+    private Properties getBootstrapProperties() throws IOException {
+        final Properties bootstrapProperties = new Properties();
+        try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
+            bootstrapProperties.load(fis);
+        }
+        return bootstrapProperties;
+    }
+
     private static List<String> getChildProcesses(final String ppid) throws IOException {
         final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
         final List<String> childPids = new ArrayList<>();
         try (final InputStream in = proc.getInputStream();
-            final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+             final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
 
             String line;
             while ((line = reader.readLine()) != null) {
@@ -726,7 +850,7 @@ public class RunMiNiFi {
         return hostname + " (" + ip + ")";
     }
 
-    private int getGracefulShutdownSeconds(Map<String, String> props, File bootstrapConfigAbsoluteFile){
+    private int getGracefulShutdownSeconds(Map<String, String> props, File bootstrapConfigAbsoluteFile) {
         String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
         if (gracefulShutdown == null) {
             gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
@@ -737,12 +861,12 @@ public class RunMiNiFi {
             gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
         } catch (final NumberFormatException nfe) {
             throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File "
-                    + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
+                + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
         }
 
         if (gracefulShutdownSeconds < 0) {
             throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File "
-                    + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
+                + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
         }
         return gracefulShutdownSeconds;
     }
@@ -763,7 +887,7 @@ public class RunMiNiFi {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public Tuple<ProcessBuilder,Process> startMiNiFi() throws IOException, InterruptedException {
+    public Tuple<ProcessBuilder, Process> startMiNiFi() throws IOException, InterruptedException {
         final Integer port = getCurrentPort(cmdLogger);
         if (port != null) {
             cmdLogger.info("Apache MiNiFi is already running, listening to Bootstrap on port " + port);
@@ -795,7 +919,7 @@ public class RunMiNiFi {
         final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim();
         File libDir = getFile(libFilename, workingDir);
 
-        final String confFilename = replaceNull(props.get("conf.dir"), "./conf").trim();
+        final String confFilename = replaceNull(props.get(CONF_DIR_KEY), "./conf").trim();
         File confDir = getFile(confFilename, workingDir);
 
         String nifiPropsFilename = props.get("props.file");
@@ -908,13 +1032,13 @@ public class RunMiNiFi {
         final Runtime runtime = Runtime.getRuntime();
         runtime.addShutdownHook(shutdownHook);
 
-        return  new Tuple<ProcessBuilder,Process>(builder,process);
+        return new Tuple<ProcessBuilder, Process>(builder, process);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public void start() throws IOException, InterruptedException {
 
-        Tuple<ProcessBuilder,Process> tuple = startMiNiFi();
+        Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
         if (tuple == null) {
             cmdLogger.info("Start method returned null, ending start command.");
             return;
@@ -946,12 +1070,19 @@ public class RunMiNiFi {
                         return;
                     }
 
-                    final File  lockFile = getLockFile(defaultLogger);
+                    final File lockFile = getLockFile(defaultLogger);
                     if (lockFile.exists()) {
                         defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi");
                         return;
                     }
 
+                    final File reloadFile = getReloadFile(defaultLogger);
+                    if (reloadFile.exists()) {
+                        defaultLogger.info("Currently reloading configuration.  Will not restart MiNiFi.");
+                        Thread.sleep(5000L);
+                        continue;
+                    }
+
                     final boolean previouslyStarted = getNifiStarted();
                     if (!previouslyStarted) {
                         defaultLogger.info("MiNiFi never started. Will not restart MiNiFi");
@@ -960,7 +1091,6 @@ public class RunMiNiFi {
                         setNiFiStarted(false);
                     }
 
-                    defaultLogger.warn("Apache MiNiFi appears to have died. Restarting...");
                     process = builder.start();
                     handleLogging(process);
 
@@ -977,7 +1107,6 @@ public class RunMiNiFi {
 
                     final boolean started = waitForStart();
 
-                    final String hostname = getHostname();
                     if (started) {
                         defaultLogger.info("Successfully started Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
                     } else {
@@ -1149,6 +1278,114 @@ public class RunMiNiFi {
         }
     }
 
+    public Set<ConfigurationChangeNotifier> getChangeNotifiers() {
+        return Collections.unmodifiableSet(changeNotifiers);
+    }
+
+    private Set<ConfigurationChangeNotifier> initializeNotifiers() throws IOException {
+        final Set<ConfigurationChangeNotifier> changeNotifiers = new HashSet<>();
+
+        final Properties bootstrapProperties = getBootstrapProperties();
+
+        final String notifiersCsv = bootstrapProperties.getProperty(NOTIFIER_COMPONENTS_KEY);
+        if (notifiersCsv != null && !notifiersCsv.isEmpty()) {
+            for (String notifierClassname : Arrays.asList(notifiersCsv.split(","))) {
+                try {
+                    Class<?> notifierClass = Class.forName(notifierClassname);
+                    ConfigurationChangeNotifier notifier = (ConfigurationChangeNotifier) notifierClass.newInstance();
+                    notifier.initialize(bootstrapProperties);
+                    changeNotifiers.add(notifier);
+                    notifier.registerListener(changeListener);
+                    notifier.start();
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new RuntimeException("Issue instantiating notifier " + notifierClassname, e);
+                }
+            }
+        }
+        return changeNotifiers;
+    }
+
+    private static class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener {
+
+        private final RunMiNiFi runner;
+        private final Logger logger;
+
+        public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger) {
+            this.runner = runner;
+            this.logger = logger;
+        }
+
+        @Override
+        public void handleChange(InputStream configInputStream) {
+            logger.info("Received notification of a change");
+            try {
+                final Properties bootstrapProperties = runner.getBootstrapProperties();
+                final File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+
+                // Store the incoming stream as a byte array to be shared among components that need it
+                final ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream();
+                byte[] copyArray = new byte[1024];
+                int available = -1;
+                while ((available = configInputStream.read(copyArray)) > 0) {
+                    bufferedConfigOs.write(copyArray, 0, available);
+                }
+
+                // Create an input stream to use for writing a config file as well as feeding to the config transformer
+                final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
+                newConfigBais.mark(-1);
+
+                logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+                saveFile(newConfigBais, configFile);
+
+                // Reset the input stream to provide to the transformer
+                newConfigBais.reset();
+
+                final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+                logger.info("Performing transformation for input and saving outputs to {}", configFile);
+                performTransformation(newConfigBais, confDir);
+
+                logger.info("Reloading instance with new configuration");
+                restartInstance();
+
+            } catch (IOException ioe) {
+                logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
+                throw new IllegalStateException("Unable to perform reload of received configuration change", ioe);
+            }
+        }
+
+        private void saveFile(final InputStream configInputStream, File configFile) {
+            try {
+                try (final FileOutputStream configFileOutputStream = new FileOutputStream(configFile)) {
+                    byte[] copyArray = new byte[1024];
+                    int available = -1;
+                    while ((available = configInputStream.read(copyArray)) > 0) {
+                        configFileOutputStream.write(copyArray, 0, available);
+                    }
+                }
+            } catch (IOException ioe) {
+                throw new IllegalStateException("Unable to save updated configuration to the configured config file location", ioe);
+            }
+        }
+
+        private void performTransformation(InputStream configIs, String configDestinationPath) {
+            try {
+                ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+            } catch (Exception e) {
+                logger.error("Unable to successfully transform the provided configuration", e);
+                throw new IllegalStateException("Unable to successfully transform the provided configuration", e);
+            }
+        }
+
+        private void restartInstance() {
+            logger.info("Restarting MiNiFi with new configuration");
+            try {
+                runner.reload();
+            } catch (IOException e) {
+                throw new IllegalStateException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+            }
+        }
+    }
+
     private static class Status {
 
         private final Integer port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index 3e3be5e..13f0d16 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -25,6 +25,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+
 public class ShutdownHook extends Thread {
 
     private final Process nifiProcess;
@@ -49,6 +51,15 @@ public class ShutdownHook extends Thread {
     @Override
     public void run() {
         executor.shutdown();
+
+        System.out.println("Initiating shutdown of bootstrap change notifiers...");
+        for (ConfigurationChangeNotifier notifier : runner.getChangeNotifiers()) {
+            try {
+                notifier.close();
+            } catch (IOException ioe) {
+                System.out.println("Could not successfully stop notifier " + notifier.getClass() + " due to " + ioe);
+            }
+        }
         runner.setAutoRestartNiFi(false);
         final int ccPort = runner.getNiFiCommandControlPort();
         if (ccPort > 0) {
@@ -66,6 +77,7 @@ public class ShutdownHook extends Thread {
             }
         }
 
+
         System.out.println("Waiting for Apache MiNiFi to finish shutting down...");
         final long startWait = System.nanoTime();
         while (RunMiNiFi.isAlive(nifiProcess)) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
new file mode 100644
index 0000000..7d9183a
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import java.io.InputStream;
+
+/**
+ * Interface for handling events detected and driven by an associated {@link ConfigurationChangeNotifier} to which the listener
+ * has registered via {@link ConfigurationChangeNotifier#registerListener(ConfigurationChangeListener)}.
+ */
+public interface ConfigurationChangeListener {
+
+    /**
+     * Provides a mechanism for the implementation to interpret the specified configuration change
+     *
+     * @param is stream of the detected content received from the change notifier
+     */
+    void handleChange(InputStream is);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
new file mode 100644
index 0000000..7ad32f1
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import java.io.Closeable;
+import java.util.Properties;
+import java.util.Set;
+
+public interface ConfigurationChangeNotifier extends Closeable {
+
+    /**
+     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
+     *
+     * @param properties from the bootstrap configuration
+     */
+    void initialize(Properties properties);
+
+    /**
+     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
+     */
+    void start();
+
+    /**
+     * Provides an immutable collection of listeners for the notifier instance
+     *
+     * @return a collection of those listeners registered for notifications
+     */
+    Set<ConfigurationChangeListener> getChangeListeners();
+
+    /**
+     * Adds a listener to be notified of configuration changes
+     *
+     * @param listener to be added to the collection
+     * @return true if the listener was added; false if already registered
+     */
+    boolean registerListener(ConfigurationChangeListener listener);
+
+    /**
+     * Provide the mechanism by which listeners are notified
+     */
+    void notifyListeners();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
new file mode 100644
index 0000000..d3f51f7
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
+
+    private Path configFile;
+    private WatchService watchService;
+    private long pollingSeconds;
+
+    private ScheduledExecutorService executorService;
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+
+    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
+    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
+
+    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+    @Override
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return Collections.unmodifiableSet(configurationChangeListeners);
+    }
+
+    @Override
+    public void notifyListeners() {
+        final File fileToRead = configFile.toFile();
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
+                listener.handleChange(fis);
+            } catch (IOException ex) {
+                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
+            }
+        }
+    }
+
+    @Override
+    public boolean registerListener(ConfigurationChangeListener listener) {
+        return this.configurationChangeListeners.add(listener);
+    }
+
+    protected boolean targetChanged() {
+        boolean targetChanged = false;
+
+        final WatchKey watchKey = this.watchService.poll();
+
+        if (watchKey == null) {
+            return targetChanged;
+        }
+
+        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+            final Path changedFile = pathEvent.context();
+
+            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
+        }
+
+        // After completing inspection, reset for detection of subsequent change events
+        boolean valid = watchKey.reset();
+        if (!valid) {
+            throw new IllegalStateException("Unable to reinitialize file system watcher.");
+        }
+
+        return targetChanged;
+    }
+
+    protected static WatchService initializeWatcher(Path filePath) {
+        try {
+            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+            final Path watchDirectory = filePath.getParent();
+            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+            return fsWatcher;
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        }
+    }
+
+    @Override
+    public void run() {
+        if (targetChanged()) {
+            notifyListeners();
+        }
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+        if (rawPath == null || rawPath.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+        }
+
+        try {
+            setConfigFile(Paths.get(rawPath));
+            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+            setWatchService(initializeWatcher(configFile));
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+        }
+    }
+
+    protected void setConfigFile(Path configFile) {
+        this.configFile = configFile;
+    }
+
+    protected void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    protected void setPollingPeriod(long duration, TimeUnit unit) {
+        if (duration < 0) {
+            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+        }
+        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("File Change Notifier Thread");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void close() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 95e9ce0..8bb25c8 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -42,6 +42,7 @@ import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
@@ -145,11 +146,17 @@ public final class ConfigTransformer {
             Yaml yaml = new Yaml();
 
             // Parse the YAML file
-            Map<String, Object> result = (Map<String, Object>) yaml.load(sourceStream);
-
-            // Write nifi.properties and flow.xml.gz
-            writeNiFiProperties(result, destPath);
-            writeFlowXml(result, destPath);
+            final Object loadedObject = yaml.load(sourceStream);
+
+            // Verify the parsed object is a Map structure
+            if (loadedObject instanceof Map) {
+                final Map<String, Object> result = (Map<String, Object>) loadedObject;
+                // Write nifi.properties and flow.xml.gz
+                writeNiFiProperties(result, destPath);
+                writeFlowXml(result, destPath);
+            } else {
+                throw new IllegalArgumentException("Provided YAML configuration is malformed.");
+            }
         } finally {
             if (sourceStream != null) {
                 sourceStream.close();
@@ -160,7 +167,8 @@ public final class ConfigTransformer {
     private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException {
         PrintWriter writer = null;
         try {
-            writer = new PrintWriter(path+"nifi.properties", "UTF-8");
+            final Path nifiPropertiesPath = Paths.get(path, "nifi.properties");
+            writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8");
 
             Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
             Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
@@ -313,7 +321,7 @@ public final class ConfigTransformer {
             addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
 
             final DOMSource domSource = new DOMSource(doc);
-            final OutputStream fileOut = Files.newOutputStream(Paths.get(path + "flow.xml.gz"));
+            final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
             final OutputStream outStream = new GZIPOutputStream(fileOut);
             final StreamResult streamResult = new StreamResult(outStream);
 
@@ -444,6 +452,9 @@ public final class ConfigTransformer {
 
     private static void addConfiguration(final Element element, Map<String, Object> elementConfig) {
         final Document doc = element.getOwnerDocument();
+        if (elementConfig == null){
+            return;
+        }
         for (final Map.Entry<String, Object> entry : elementConfig.entrySet()) {
 
             final Element propElement = doc.createElement("property");

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
new file mode 100644
index 0000000..9432a2f
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeNotifier {
+
+    private static final String CONFIG_FILENAME = "config.yml";
+    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+    private FileChangeNotifier notifierSpy;
+    private WatchService mockWatchService;
+    private Properties testProperties;
+
+    @Before
+    public void setUp() throws Exception {
+        mockWatchService = Mockito.mock(WatchService.class);
+        notifierSpy = Mockito.spy(new FileChangeNotifier());
+        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
+        notifierSpy.setWatchService(mockWatchService);
+
+        testProperties = new Properties();
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        notifierSpy.close();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidFile() throws Exception {
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_validFile() throws Exception {
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidPollingPeriod() throws Exception {
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_useDefaultPolling() throws Exception {
+        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
+        notifierSpy.initialize(testProperties);
+    }
+
+
+    @Test
+    public void testNotifyListeners() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(testListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        notifierSpy.notifyListeners();
+
+        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testRegisterListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+        wasRegistered = notifierSpy.registerListener(secondListener);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
+
+    }
+
+    @Test
+    public void testRegisterDuplicateListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
+    }
+
+    /* Verify handleChange events */
+    @Test
+    public void testTargetChangedNoModification() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case the WatchKey is null because there were no events found
+        establishMockEnvironmentForChangeTests(testListener, null);
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        notifierSpy.targetChanged();
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        // Invoke the method of interest
+        notifierSpy.run();
+
+        verify(mockWatchService, Mockito.atLeastOnce()).poll();
+        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    /* Helper methods to establish mock environment */
+    private WatchKey createMockWatchKeyForPath(String configFilePath) {
+        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+        when(mockWatchKey.reset()).thenReturn(true);
+
+        final Iterator mockIterator = Mockito.mock(Iterator.class);
+        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+        return mockWatchKey;
+    }
+
+    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
+        final boolean wasRegistered = notifierSpy.registerListener(listener);
+
+        // Establish the file mock and its parent directory
+        final Path mockConfigFilePath = Mockito.mock(Path.class);
+        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+        // When getting the parent of the file, get the directory
+        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        when(mockWatchService.poll()).thenReturn(watchKey);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index cceed16..1a7f261 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -17,12 +17,13 @@
 
 package org.apache.nifi.minifi.bootstrap.util;
 
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileInputStream;
 
-import static org.junit.Assert.assertTrue;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class TestConfigTransformer {
 
@@ -61,4 +62,12 @@ public class TestConfigTransformer {
 
         flowXml.deleteOnExit();
     }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void handleTransformInvalidFile() throws Exception {
+
+        ConfigTransformer.transformConfigFile("./src/test/resources/config-invalid.yml", "./target/");
+
+        Assert.fail("Invalid configuration file was not detected.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-bootstrap/src/test/resources/config-invalid.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-invalid.yml b/minifi-bootstrap/src/test/resources/config-invalid.yml
new file mode 100644
index 0000000..6232aba
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-invalid.yml
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+This is an invalid yaml file.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 5e676ff..c9626da 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -28,6 +28,19 @@ conf.dir=./conf
 # How long to wait after telling MiNiFi to shutdown before explicitly killing the Process
 graceful.shutdown.seconds=20
 
+# The location for the configuration file
+nifi.minifi.config=./conf/config.yml
+
+# Notifiers to use for the associated agent, comma separated list of class names
+#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.FileChangeNotifier
+
+# File change notifier configuration
+
+# Path of the file to monitor for changes.  When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process
+#nifi.minifi.notifier.file.config.path=
+# How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes.
+#nifi.minifi.notifier.file.polling.period.seconds=5
+
 # Disable JSR 199 so that we can use JSP's without running a JDK
 java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
index a656a7a..36c4347 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
@@ -42,18 +42,4 @@ limitations under the License.
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.rat</groupId>
-                <artifactId>apache-rat-plugin</artifactId>
-                <configuration>
-                    <excludes combine.children="append">
-                        <exclude>src/test/resources/config.yml</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
index 420cb49..bbd0517 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
@@ -80,7 +80,7 @@ public class BootstrapListener {
         listenThread.start();
 
         logger.debug("Notifying Bootstrap that local port is {}", localPort);
-        sendCommand("PORT", new String[] { String.valueOf(localPort), secretKey});
+        sendCommand("PORT", new String[]{String.valueOf(localPort), secretKey});
     }
 
     public void stop() {
@@ -91,7 +91,7 @@ public class BootstrapListener {
 
     public void sendStartedStatus(boolean status) throws IOException {
         logger.debug("Notifying Bootstrap that the status of starting MiNiFi is {}", status);
-        sendCommand("STARTED", new String[]{ String.valueOf(status) });
+        sendCommand("STARTED", new String[]{String.valueOf(status)});
     }
 
     private void sendCommand(final String command, final String[] args) throws IOException {
@@ -186,6 +186,11 @@ public class BootstrapListener {
                                         echoPing(socket.getOutputStream());
                                         logger.debug("Responded to PING request from Bootstrap");
                                         break;
+                                    case RELOAD:
+                                        logger.info("Received RELOAD request from Bootstrap");
+                                        echoReload(socket.getOutputStream());
+                                        nifi.shutdownHook();
+                                        return;
                                     case SHUTDOWN:
                                         logger.info("Received SHUTDOWN request from Bootstrap");
                                         echoShutdown(socket.getOutputStream());
@@ -328,6 +333,11 @@ public class BootstrapListener {
         out.flush();
     }
 
+    private void echoReload(final OutputStream out) throws IOException {
+        out.write("RELOAD\n".getBytes(StandardCharsets.UTF_8));
+        out.flush();
+    }
+
     @SuppressWarnings("resource")  // we don't want to close the stream, as the caller will do that
     private BootstrapRequest readRequest(final InputStream in) throws IOException {
         // We want to ensure that we don't try to read data from an InputStream directly
@@ -369,7 +379,7 @@ public class BootstrapListener {
     private static class BootstrapRequest {
 
         public static enum RequestType {
-
+            RELOAD,
             SHUTDOWN,
             DUMP,
             PING;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/3a967be7/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/configuration/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/configuration/FileChangeNotifier.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/configuration/FileChangeNotifier.java
deleted file mode 100644
index 36c5968..0000000
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/configuration/FileChangeNotifier.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
- * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
- * ensures continuity of monitoring.
- */
-public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier, Closeable {
-
-    private Path configFile;
-    private WatchService watchService;
-    private long pollingSeconds;
-
-    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-
-    protected static final String CONFIG_FILE_PATH_KEY = "nifi.minifi.notifier.file.config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = "nifi.minifi.notifier.file.polling.period.seconds";
-
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
-    }
-
-    @Override
-    public void notifyListeners() {
-        final File fileToRead = configFile.toFile();
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
-                listener.handleChange(fis);
-            } catch (IOException ex) {
-                throw new IllegalStateException("Unable to read the changed file " + configFile, ex);
-            }
-        }
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return this.configurationChangeListeners.add(listener);
-    }
-
-    protected boolean targetChanged() {
-        boolean targetChanged = false;
-
-        final WatchKey watchKey = this.watchService.poll();
-
-        if (watchKey == null) {
-            return targetChanged;
-        }
-
-        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
-            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
-
-            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
-            final Path changedFile = pathEvent.context();
-
-            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
-            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
-        }
-
-        // After completing inspection, reset for detection of subsequent change events
-        boolean valid = watchKey.reset();
-        if (!valid) {
-            throw new IllegalStateException("Unable to reinitialize file system watcher.");
-        }
-
-        return targetChanged;
-    }
-
-    protected static WatchService initializeWatcher(Path filePath) {
-        try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
-
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
-        }
-    }
-
-    @Override
-    public void run() {
-        if (targetChanged()) {
-            notifyListeners();
-        }
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
-        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
-
-        try {
-            setConfigFile(Paths.get(rawPath));
-            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
-            setWatchService(initializeWatcher(configFile));
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Could not successfully initialize file change notifier.", e);
-        }
-    }
-
-    protected void setConfigFile(Path configFile) {
-        final File file = configFile.toFile();
-        if (!file.exists() || !file.canRead() || !file.isFile()) {
-            throw new IllegalArgumentException(String.format("The specified path %s must be a readable file.", configFile));
-        }
-        this.configFile = configFile;
-    }
-
-    protected void setWatchService(WatchService watchService) {
-        this.watchService = watchService;
-    }
-
-    protected void setPollingPeriod(long duration, TimeUnit unit) {
-        if (duration < 0) {
-            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
-        }
-        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
-    }
-
-    @Override
-    public void start() {
-        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
-    }
-
-    @Override
-    public void close() {
-        if (!this.executorService.isShutdown() || !this.executorService.isTerminated()) {
-            this.executorService.shutdownNow();
-        }
-    }
-}
-