You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/25 17:54:48 UTC

[2/3] nifi-minifi git commit: MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 8bb25c8..633cce2 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -19,6 +19,8 @@ package org.apache.nifi.minifi.bootstrap.util;
 
 
 import org.apache.nifi.controller.FlowSerializationException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -37,6 +39,8 @@ import javax.xml.transform.stream.StreamResult;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
@@ -151,11 +155,19 @@ public final class ConfigTransformer {
             // Verify the parsed object is a Map structure
             if (loadedObject instanceof Map) {
                 final Map<String, Object> result = (Map<String, Object>) loadedObject;
+
+                // Create nifi.properties and flow.xml.gz in memory
+                ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
+                writeNiFiProperties(result, nifiPropertiesOutputStream);
+
+                DOMSource flowXml = createFlowXml(result);
+
                 // Write nifi.properties and flow.xml.gz
-                writeNiFiProperties(result, destPath);
-                writeFlowXml(result, destPath);
+                writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
+
+                writeFlowXmlFile(flowXml, destPath);
             } else {
-                throw new IllegalArgumentException("Provided YAML configuration is malformed.");
+                throw new IllegalArgumentException("Provided YAML configuration is not a Map.");
             }
         } finally {
             if (sourceStream != null) {
@@ -164,20 +176,49 @@ public final class ConfigTransformer {
         }
     }
 
-    private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException {
+    private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
+        try {
+            final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
+            FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString()));
+            nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer());
+        } finally {
+            if (nifiPropertiesOutputStream != null){
+                nifiPropertiesOutputStream.flush();
+                nifiPropertiesOutputStream.close();
+            }
+        }
+    }
+
+    private static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException {
+
+        final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
+        final OutputStream outStream = new GZIPOutputStream(fileOut);
+        final StreamResult streamResult = new StreamResult(outStream);
+
+        // configure the transformer and convert the DOM
+        final TransformerFactory transformFactory = TransformerFactory.newInstance();
+        final Transformer transformer = transformFactory.newTransformer();
+        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+        // transform the document to byte stream
+        transformer.transform(domSource, streamResult);
+        outStream.flush();
+        outStream.close();
+    }
+
+    private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException {
         PrintWriter writer = null;
         try {
-            final Path nifiPropertiesPath = Paths.get(path, "nifi.properties");
-            writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8");
+            writer = new PrintWriter(outputStream, true);
 
-            Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
-            Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
+            Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
+            Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
             Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY);
-            Map<String,Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
-            Map<String,Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
-            Map<String,Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-            Map<String,Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
-
+            Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
+            Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
+            Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+            Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
 
             writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE);
             writer.println("# Core Properties #");
@@ -284,6 +325,8 @@ public final class ConfigTransformer {
             writer.println();
             writer.println("# cluster manager properties (only configure for cluster manager) #");
             writer.println("nifi.cluster.is.manager=false");
+        } catch (NullPointerException e) {
+            throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e);
         } finally {
             if (writer != null){
                 writer.flush();
@@ -291,7 +334,7 @@ public final class ConfigTransformer {
             }
         }
     }
-    private static void writeFlowXml(Map<String, Object> topLevelYaml, String path) throws Exception {
+    private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException {
         try {
             // create a new, empty document
             final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -304,39 +347,32 @@ public final class ConfigTransformer {
             final Element rootNode = doc.createElement("flowController");
             doc.appendChild(rootNode);
             Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-            addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
-            addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
+            addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
             addProcessGroup(rootNode, topLevelYaml, "rootGroup");
 
             Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-            String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
-            if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
-                final Element controllerServicesNode = doc.createElement("controllerServices");
-                rootNode.appendChild(controllerServicesNode);
-                addSSLControllerService(controllerServicesNode, securityProps);
+            if (securityProps != null) {
+                String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+                if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+                    final Element controllerServicesNode = doc.createElement("controllerServices");
+                    rootNode.appendChild(controllerServicesNode);
+                    addSSLControllerService(controllerServicesNode, securityProps);
+                }
+            }
+
+            Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+            if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) {
+                final Element reportingTasksNode = doc.createElement("reportingTasks");
+                rootNode.appendChild(reportingTasksNode);
+                addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
             }
 
-            final Element reportingTasksNode = doc.createElement("reportingTasks");
-            rootNode.appendChild(reportingTasksNode);
-            addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
-
-            final DOMSource domSource = new DOMSource(doc);
-            final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
-            final OutputStream outStream = new GZIPOutputStream(fileOut);
-            final StreamResult streamResult = new StreamResult(outStream);
-
-            // configure the transformer and convert the DOM
-            final TransformerFactory transformFactory = TransformerFactory.newInstance();
-            final Transformer transformer = transformFactory.newTransformer();
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
-            // transform the document to byte stream
-            transformer.transform(domSource, streamResult);
-            outStream.flush();
-            outStream.close();
-        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
+            return new DOMSource(doc);
+        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
             throw new FlowSerializationException(e);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e);
         }
     }
 
@@ -345,109 +381,140 @@ public final class ConfigTransformer {
         return value == null ? "" : value.toString();
     }
 
-    private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) {
-        final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
-        addTextElement(serviceElement, "id", "SSL-Context-Service");
-        addTextElement(serviceElement, "name", "SSL-Context-Service");
-        addTextElement(serviceElement, "comment", "");
-        addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
-
-        addTextElement(serviceElement, "enabled", "true");
-
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
-        attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
-        attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
-        attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
-        attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
-        attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
-        attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
-
-        addConfiguration(serviceElement, attributes);
+    private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){
+        Object value = null;
+        if (map != null){
+            value = map.get(key);
+        }
+        return value == null ? theDefault : value.toString();
+    }
 
-        element.appendChild(serviceElement);
+    private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException {
+        try {
+            final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+            addTextElement(serviceElement, "id", "SSL-Context-Service");
+            addTextElement(serviceElement, "name", "SSL-Context-Service");
+            addTextElement(serviceElement, "comment", "");
+            addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
+
+            addTextElement(serviceElement, "enabled", "true");
+
+            Map<String, Object> attributes = new HashMap<>();
+            attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
+            attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
+            attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
+            attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
+            attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
+            attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
+            attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
+
+            addConfiguration(serviceElement, attributes);
+
+            element.appendChild(serviceElement);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e);
+        }
     }
 
-    private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) {
-        Map<String,Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
+    private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
 
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Root-Group");
-        addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY) );
-        addPosition(element);
-        addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement(elementName);
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Root-Group");
+            addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
 
-        Map<String,Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
-        addProcessor(element, processorConfig);
+            Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
+            addProcessor(element, processorConfig);
 
-        Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-        addRemoteProcessGroup(element, remoteProcessingGroup);
+            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+            addRemoteProcessGroup(element, remoteProcessingGroup);
 
-        addConnection(element, topLevelYaml);
+            addConnection(element, topLevelYaml);
+        } catch (ConfigurationChangeException e){
+            throw e;
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e);
+        }
     }
 
-    private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) {
+    private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException {
 
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("processor");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Processor");
-        addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
-
-        addPosition(element);
-        addStyle(element);
-
-        addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
-        addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
-        addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
-        addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
-        addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
-        addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
-        addTextElement(element, "bulletinLevel", "WARN");
-        addTextElement(element, "lossTolerant", "false");
-        addTextElement(element, "scheduledState", "RUNNING");
-        addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
-        addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
-
-        addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
-
-        Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
-        if (autoTerminatedRelationships != null) {
-            for (String rel : autoTerminatedRelationships) {
-                addTextElement(element, "autoTerminatedRelationship", rel);
+        try {
+            if (processorConfig.get(CLASS_KEY) == null) {
+                // Only add a processor if it has a class
+                return;
+            }
+
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("processor");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Processor");
+            addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
+
+            addPosition(element);
+            addStyle(element);
+
+            addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
+            addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
+            addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
+            addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
+            addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
+            addTextElement(element, "bulletinLevel", "WARN");
+            addTextElement(element, "lossTolerant", "false");
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
+            addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
+
+            addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
+
+            Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
+            if (autoTerminatedRelationships != null) {
+                for (String rel : autoTerminatedRelationships) {
+                    addTextElement(element, "autoTerminatedRelationship", rel);
+                }
             }
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e);
         }
     }
 
-    private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) {
-        Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
-        final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
-        addTextElement(taskElement, "id", "Provenance-Reporting");
-        addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
-        addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
-        addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
-        addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
-        addTextElement(taskElement, "scheduledState", "RUNNING");
-        addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
-
-        Map<String, Object> attributes = new HashMap<>();
-        attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
-        attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
-        attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
-        attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
-        attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
-
-        Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
-        String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
-        if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
-            attributes.put("SSL Context Service", "SSL-Context-Service");
-        }
+    private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+            final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+            addTextElement(taskElement, "id", "Provenance-Reporting");
+            addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
+            addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
+            addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
+            addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
+            addTextElement(taskElement, "scheduledState", "RUNNING");
+            addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
+
+            Map<String, Object> attributes = new HashMap<>();
+            attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
+            attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
+            attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
+            attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
+            attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
+
+            Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+            String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+            if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+                attributes.put("SSL Context Service", "SSL-Context-Service");
+            }
 
-        addConfiguration(taskElement, attributes);
+            addConfiguration(taskElement, attributes);
 
-        element.appendChild(taskElement);
+            element.appendChild(taskElement);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
+        }
     }
 
     private static void addConfiguration(final Element element, Map<String, Object> elementConfig) {
@@ -472,75 +539,103 @@ public final class ConfigTransformer {
         parentElement.appendChild(element);
     }
 
-    private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) {
-
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("remoteProcessGroup");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Remote-Process-Group");
-        addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
-        addPosition(element);
-        addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
-        addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
-        addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
-        addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
-        addTextElement(element, "transmitting", "true");
-
-        Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
-        addRemoteGroupPort(element, inputPort, "inputPort");
+    private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException {
+        try {
+            if (remoteProcessingGroup.get(URL_KEY) == null) {
+                // Only add an an RPG if it has a URL
+                return;
+            }
 
-        parentElement.appendChild(element);
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("remoteProcessGroup");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Remote-Process-Group");
+            addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
+            addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
+            addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
+            addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
+            addTextElement(element, "transmitting", "true");
+
+            Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+            addRemoteGroupPort(element, inputPort, "inputPort");
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e);
+        }
     }
 
-    private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) {
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement(elementName);
-        parentElement.appendChild(element);
-        addTextElement(element, "id", getValueString(inputPort, ID_KEY));
-        addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
-        addPosition(element);
-        addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
-        addTextElement(element, "scheduledState", "RUNNING");
-        addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
-        addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+    private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException {
 
-        parentElement.appendChild(element);
+        try {
+            if (inputPort.get(ID_KEY) == null) {
+                // Only add an input port if it has an ID
+                return;
+            }
+
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement(elementName);
+            parentElement.appendChild(element);
+            addTextElement(element, "id", getValueString(inputPort, ID_KEY));
+            addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
+            addPosition(element);
+            addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
+            addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e);
+        }
     }
 
-    private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) {
-        Map<String,Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
-        Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
-        Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
-        final Document doc = parentElement.getOwnerDocument();
-        final Element element = doc.createElement("connection");
-        parentElement.appendChild(element);
-        addTextElement(element, "id", "Connection");
-        addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
+    private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+        try {
+            Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
+            Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+            Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+            Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
 
-        final Element bendPointsElement = doc.createElement("bendPoints");
-        element.appendChild(bendPointsElement);
+            if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) {
+                // Only add the connection if the input port and processor config are created
+                return;
+            }
 
-        addTextElement(element, "labelIndex", "1");
-        addTextElement(element, "zIndex", "0");
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("connection");
+            parentElement.appendChild(element);
+            addTextElement(element, "id", "Connection");
+            addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
 
-        addTextElement(element, "sourceId", "Processor");
-        addTextElement(element, "sourceGroupId", "Root-Group");
-        addTextElement(element, "sourceType", "PROCESSOR");
+            final Element bendPointsElement = doc.createElement("bendPoints");
+            element.appendChild(bendPointsElement);
 
-        addTextElement(element, "destinationId", getValueString(inputPort,ID_KEY));
-        addTextElement(element, "destinationGroupId", "Remote-Process-Group");
-        addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+            addTextElement(element, "labelIndex", "1");
+            addTextElement(element, "zIndex", "0");
 
-        addTextElement(element, "relationship", "success");
+            addTextElement(element, "sourceId", "Processor");
+            addTextElement(element, "sourceGroupId", "Root-Group");
+            addTextElement(element, "sourceType", "PROCESSOR");
 
-        addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
-        addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
+            addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY));
+            addTextElement(element, "destinationGroupId", "Remote-Process-Group");
+            addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
 
-        addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
-        addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+            addTextElement(element, "relationship", "success");
 
+            addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
+            addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
 
-        parentElement.appendChild(element);
+            addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
+            addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+
+            parentElement.appendChild(element);
+        } catch (Exception e){
+            throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e);
+        }
     }
 
     private static void addPosition(final Element parentElement) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
deleted file mode 100644
index 9432a2f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeNotifier {
-
-    private static final String CONFIG_FILENAME = "config.yml";
-    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
-    private FileChangeNotifier notifierSpy;
-    private WatchService mockWatchService;
-    private Properties testProperties;
-
-    @Before
-    public void setUp() throws Exception {
-        mockWatchService = Mockito.mock(WatchService.class);
-        notifierSpy = Mockito.spy(new FileChangeNotifier());
-        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
-        notifierSpy.setWatchService(mockWatchService);
-
-        testProperties = new Properties();
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        notifierSpy.close();
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidFile() throws Exception {
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_validFile() throws Exception {
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidPollingPeriod() throws Exception {
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_useDefaultPolling() throws Exception {
-        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
-        notifierSpy.initialize(testProperties);
-    }
-
-
-    @Test
-    public void testNotifyListeners() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(testListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        notifierSpy.notifyListeners();
-
-        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testRegisterListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
-        wasRegistered = notifierSpy.registerListener(secondListener);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
-
-    }
-
-    @Test
-    public void testRegisterDuplicateListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
-    }
-
-    /* Verify handleChange events */
-    @Test
-    public void testTargetChangedNoModification() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case the WatchKey is null because there were no events found
-        establishMockEnvironmentForChangeTests(testListener, null);
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
-        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        notifierSpy.targetChanged();
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
-        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        // Invoke the method of interest
-        notifierSpy.run();
-
-        verify(mockWatchService, Mockito.atLeastOnce()).poll();
-        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    /* Helper methods to establish mock environment */
-    private WatchKey createMockWatchKeyForPath(String configFilePath) {
-        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
-        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
-        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
-        when(mockWatchKey.reset()).thenReturn(true);
-
-        final Iterator mockIterator = Mockito.mock(Iterator.class);
-        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
-        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
-        when(mockIterator.hasNext()).thenReturn(true, false);
-        when(mockIterator.next()).thenReturn(mockWatchEvent);
-
-        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
-        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
-        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
-        return mockWatchKey;
-    }
-
-    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
-        final boolean wasRegistered = notifierSpy.registerListener(listener);
-
-        // Establish the file mock and its parent directory
-        final Path mockConfigFilePath = Mockito.mock(Path.class);
-        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
-        // When getting the parent of the file, get the directory
-        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        when(mockWatchService.poll()).thenReturn(watchKey);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
deleted file mode 100644
index 75b44e3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
-
-    @BeforeClass
-    public static void setUp() throws InterruptedException, MalformedURLException {
-        Properties properties = new Properties();
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
deleted file mode 100644
index 908e693..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
-
-
-    @BeforeClass
-    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
-        Properties properties = new Properties();
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        SSLContext sslContext = SSLContext.getInstance("TLS");
-        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
-
-        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
-
-        sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
-        client.setSslSocketFactory(sslContext.getSocketFactory());
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-
-    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
-        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-
-        char[] password = "localtest".toCharArray();
-
-        java.io.FileInputStream fis = null;
-        try {
-            fis = new java.io.FileInputStream(path);
-            ks.load(fis, password);
-        } finally {
-            if (fis != null) {
-                fis.close();
-            }
-        }
-        return ks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
new file mode 100644
index 0000000..145c2fe
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeNotifier {
+
+    private static final String CONFIG_FILENAME = "config.yml";
+    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+    private FileChangeNotifier notifierSpy;
+    private WatchService mockWatchService;
+    private Properties testProperties;
+
+    @Before
+    public void setUp() throws Exception {
+        mockWatchService = Mockito.mock(WatchService.class);
+        notifierSpy = Mockito.spy(new FileChangeNotifier());
+        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
+        notifierSpy.setWatchService(mockWatchService);
+
+        testProperties = new Properties();
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        notifierSpy.close();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidFile() throws Exception {
+        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_validFile() throws Exception {
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidPollingPeriod() throws Exception {
+        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
+        notifierSpy.initialize(testProperties);
+    }
+
+    @Test
+    public void testInitialize_useDefaultPolling() throws Exception {
+        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
+        notifierSpy.initialize(testProperties);
+    }
+
+
+    @Test
+    public void testNotifyListeners() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(testListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        notifierSpy.notifyListeners();
+
+        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testRegisterListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+        wasRegistered = notifierSpy.registerListener(secondListener);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
+
+    }
+
+    @Test
+    public void testRegisterDuplicateListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        wasRegistered = notifierSpy.registerListener(firstListener);
+
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
+    }
+
+    /* Verify handleChange events */
+    @Test
+    public void testTargetChangedNoModification() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case the WatchKey is null because there were no events found
+        establishMockEnvironmentForChangeTests(testListener, null);
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        notifierSpy.targetChanged();
+
+        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+        // Invoke the method of interest
+        notifierSpy.run();
+
+        verify(mockWatchService, Mockito.atLeastOnce()).poll();
+        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
+    }
+
+    /* Helper methods to establish mock environment */
+    private WatchKey createMockWatchKeyForPath(String configFilePath) {
+        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+        when(mockWatchKey.reset()).thenReturn(true);
+
+        final Iterator mockIterator = Mockito.mock(Iterator.class);
+        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+        return mockWatchKey;
+    }
+
+    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
+        final boolean wasRegistered = notifierSpy.registerListener(listener);
+
+        // Establish the file mock and its parent directory
+        final Path mockConfigFilePath = Mockito.mock(Path.class);
+        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+        // When getting the parent of the file, get the directory
+        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+        when(mockWatchService.poll()).thenReturn(watchKey);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
new file mode 100644
index 0000000..1cd37fd
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
+
+    @BeforeClass
+    public static void setUp() throws InterruptedException, MalformedURLException {
+        Properties properties = new Properties();
+        restChangeNotifier = new RestChangeNotifier();
+        restChangeNotifier.initialize(properties);
+        restChangeNotifier.registerListener(mockChangeListener);
+        restChangeNotifier.start();
+
+        client = new OkHttpClient();
+
+        url = restChangeNotifier.getURI().toURL().toString();
+        Thread.sleep(1000);
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeNotifier.close();
+        client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
new file mode 100644
index 0000000..6073a6f
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
+
+
+    @BeforeClass
+    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+        Properties properties = new Properties();
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
+        restChangeNotifier = new RestChangeNotifier();
+        restChangeNotifier.initialize(properties);
+        restChangeNotifier.registerListener(mockChangeListener);
+        restChangeNotifier.start();
+
+        client = new OkHttpClient();
+
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
+
+        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
+
+        sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
+        client.setSslSocketFactory(sslContext.getSocketFactory());
+
+        url = restChangeNotifier.getURI().toURL().toString();
+        Thread.sleep(1000);
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeNotifier.close();
+        client = null;
+    }
+
+    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+        char[] password = "localtest".toCharArray();
+
+        java.io.FileInputStream fis = null;
+        try {
+            fis = new java.io.FileInputStream(path);
+            ks.load(fis, password);
+        } finally {
+            if (fis != null) {
+                fis.close();
+            }
+        }
+        return ks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
new file mode 100644
index 0000000..eae5872
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class MockChangeListener implements ConfigurationChangeListener {
+    String confFile;
+
+    @Override
+    public void handleChange(InputStream inputStream) {
+        try {
+            confFile = IOUtils.toString(inputStream, "UTF-8");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getDescriptor() {
+        return "MockChangeListener";
+    }
+
+    public String getConfFile() {
+        return confFile;
+    }
+
+    public void setConfFile(String confFile) {
+        this.confFile = confFile;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
new file mode 100644
index 0000000..78f6cd5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import com.squareup.okhttp.Headers;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class TestRestChangeNotifierCommon {
+
+    public static OkHttpClient client;
+    public static RestChangeNotifier restChangeNotifier;
+    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
+    public static String url;
+    public static MockChangeListener mockChangeListener = new MockChangeListener();
+
+    @Test
+    public void testGet() throws Exception {
+        assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+        Request request = new Request.Builder()
+                .url(url)
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
+    }
+
+    @Test
+    public void testFileUpload() throws Exception {
+        assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+        File file = new File("src/test/resources/testUploadFile.txt");
+        assertTrue(file.exists());
+        assertTrue(file.canRead());
+
+        Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
+                .addHeader("charset","UTF-8")
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+        assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
deleted file mode 100644
index 6843889..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class MockChangeListener implements ConfigurationChangeListener {
-    String confFile;
-
-    @Override
-    public void handleChange(InputStream inputStream) {
-        try {
-            confFile = IOUtils.toString(inputStream, "UTF-8");
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public String getConfFile() {
-        return confFile;
-    }
-
-    public void setConfFile(String confFile) {
-        this.confFile = confFile;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
deleted file mode 100644
index b3c4f54..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import com.squareup.okhttp.Headers;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public abstract class TestRestChangeNotifierCommon {
-
-    public static OkHttpClient client;
-    public static RestChangeNotifier restChangeNotifier;
-    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
-    public static String url;
-    public static MockChangeListener mockChangeListener = new MockChangeListener();
-
-    @Test
-    public void testGet() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
-        Request request = new Request.Builder()
-                .url(url)
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
-    }
-
-    @Test
-    public void testFileUpload() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
-        File file = new File("src/test/resources/testUploadFile.txt");
-        assertTrue(file.exists());
-        assertTrue(file.canRead());
-
-        Request request = new Request.Builder()
-                .url(url)
-                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
-                .addHeader("charset","UTF-8")
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals(RestChangeNotifier.POST_TEXT, response.body().string());
-
-        assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index 1a7f261..d0a7d71 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,6 +64,24 @@ public class TestConfigTransformer {
         flowXml.deleteOnExit();
     }
 
+    @Test
+    public void doesTransformOnDefaultFile() throws Exception {
+
+        ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
+        File nifiPropertiesFile = new File("./target/nifi.properties");
+
+        assertTrue(nifiPropertiesFile.exists());
+        assertTrue(nifiPropertiesFile.canRead());
+
+        nifiPropertiesFile.deleteOnExit();
+
+        File flowXml = new File("./target/flow.xml.gz");
+        assertTrue(flowXml.exists());
+        assertTrue(flowXml.canRead());
+
+        flowXml.deleteOnExit();
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void handleTransformInvalidFile() throws Exception {
 
@@ -70,4 +89,12 @@ public class TestConfigTransformer {
 
         Assert.fail("Invalid configuration file was not detected.");
     }
+
+    @Test(expected = ConfigurationChangeException.class)
+    public void handleTransformEmptyFile() throws Exception {
+
+        ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
+
+        Assert.fail("Invalid configuration file was not detected.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/config-empty.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-empty.yml b/minifi-bootstrap/src/test/resources/config-empty.yml
new file mode 100644
index 0000000..fbbbeb9
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-empty.yml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+    name: MiNiFi Flow
+    comment:
\ No newline at end of file