You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/11/11 17:17:55 UTC

[1/3] nifi-minifi git commit: MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

Repository: nifi-minifi
Updated Branches:
  refs/heads/master 7954d36e9 -> 6f3c56780


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
new file mode 100644
index 0000000..72e768a
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
+
+import okhttp3.Headers;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public abstract class TestRestChangeIngestorCommon {
+
+    private static String testString = "This is a test string.";
+
+    public static OkHttpClient client;
+    public static RestChangeIngestor restChangeIngestor;
+    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
+    public static String url;
+    public static ConfigurationChangeNotifier testNotifier;
+    public static Differentiator<InputStream> mockDifferentiator = Mockito.mock(Differentiator.class);
+
+
+    @Before
+    public void before() {
+        Mockito.reset(testNotifier);
+        ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+        Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        Request request = new Request.Builder()
+                .url(url)
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals(RestChangeIngestor.GET_TEXT, response.body().string());
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+    }
+
+    @Test
+    public void testFileUploadNewConfig() throws Exception {
+        when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+
+        Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
+                .addHeader("charset","UTF-8")
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(ByteBuffer.wrap(testString.getBytes())));
+    }
+
+    @Test
+    public void testFileUploadSameConfig() throws Exception {
+        when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+
+        Request request = new Request.Builder()
+                .url(url)
+                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
+                .addHeader("charset","UTF-8")
+                .build();
+
+        Response response = client.newCall(request).execute();
+        if (response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+        Headers responseHeaders = response.headers();
+        for (int i = 0; i < responseHeaders.size(); i++) {
+            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+        }
+
+        assertEquals("Request received but instance is already running this config.", response.body().string());
+
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
deleted file mode 100644
index 145c2fe..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeNotifier {
-
-    private static final String CONFIG_FILENAME = "config.yml";
-    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
-    private FileChangeNotifier notifierSpy;
-    private WatchService mockWatchService;
-    private Properties testProperties;
-
-    @Before
-    public void setUp() throws Exception {
-        mockWatchService = Mockito.mock(WatchService.class);
-        notifierSpy = Mockito.spy(new FileChangeNotifier());
-        notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
-        notifierSpy.setWatchService(mockWatchService);
-
-        testProperties = new Properties();
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        notifierSpy.close();
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidFile() throws Exception {
-        testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_validFile() throws Exception {
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testInitialize_invalidPollingPeriod() throws Exception {
-        testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
-        notifierSpy.initialize(testProperties);
-    }
-
-    @Test
-    public void testInitialize_useDefaultPolling() throws Exception {
-        testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
-        notifierSpy.initialize(testProperties);
-    }
-
-
-    @Test
-    public void testNotifyListeners() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(testListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        notifierSpy.notifyListeners();
-
-        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testRegisterListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
-        wasRegistered = notifierSpy.registerListener(secondListener);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
-
-    }
-
-    @Test
-    public void testRegisterDuplicateListener() throws Exception {
-        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
-        boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        wasRegistered = notifierSpy.registerListener(firstListener);
-
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-        Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
-    }
-
-    /* Verify handleChange events */
-    @Test
-    public void testTargetChangedNoModification() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case the WatchKey is null because there were no events found
-        establishMockEnvironmentForChangeTests(testListener, null);
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
-        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        notifierSpy.targetChanged();
-
-        verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    @Test
-    public void testTargetChangedWithModificationEvent() throws Exception {
-        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
-        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
-        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
-        establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
-        // Invoke the method of interest
-        notifierSpy.run();
-
-        verify(mockWatchService, Mockito.atLeastOnce()).poll();
-        verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
-    }
-
-    /* Helper methods to establish mock environment */
-    private WatchKey createMockWatchKeyForPath(String configFilePath) {
-        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
-        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
-        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
-        when(mockWatchKey.reset()).thenReturn(true);
-
-        final Iterator mockIterator = Mockito.mock(Iterator.class);
-        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
-        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
-        when(mockIterator.hasNext()).thenReturn(true, false);
-        when(mockIterator.next()).thenReturn(mockWatchEvent);
-
-        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
-        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
-        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
-        return mockWatchKey;
-    }
-
-    private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
-        final boolean wasRegistered = notifierSpy.registerListener(listener);
-
-        // Establish the file mock and its parent directory
-        final Path mockConfigFilePath = Mockito.mock(Path.class);
-        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
-        // When getting the parent of the file, get the directory
-        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
-        Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
-        Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
-        when(mockWatchService.poll()).thenReturn(watchKey);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
deleted file mode 100644
index 1cd37fd..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
-
-    @BeforeClass
-    public static void setUp() throws InterruptedException, MalformedURLException {
-        Properties properties = new Properties();
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
deleted file mode 100644
index 6073a6f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
-
-
-    @BeforeClass
-    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
-        Properties properties = new Properties();
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
-        properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
-        properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
-        restChangeNotifier = new RestChangeNotifier();
-        restChangeNotifier.initialize(properties);
-        restChangeNotifier.registerListener(mockChangeListener);
-        restChangeNotifier.start();
-
-        client = new OkHttpClient();
-
-        SSLContext sslContext = SSLContext.getInstance("TLS");
-        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
-
-        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
-
-        sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
-        client.setSslSocketFactory(sslContext.getSocketFactory());
-
-        url = restChangeNotifier.getURI().toURL().toString();
-        Thread.sleep(1000);
-    }
-
-    @AfterClass
-    public static void stop() throws Exception {
-        restChangeNotifier.close();
-        client = null;
-    }
-
-    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
-        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-
-        char[] password = "localtest".toCharArray();
-
-        java.io.FileInputStream fis = null;
-        try {
-            fis = new java.io.FileInputStream(path);
-            ks.load(fis, password);
-        } finally {
-            if (fis != null) {
-                fis.close();
-            }
-        }
-        return ks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
deleted file mode 100644
index eae5872..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class MockChangeListener implements ConfigurationChangeListener {
-    String confFile;
-
-    @Override
-    public void handleChange(InputStream inputStream) {
-        try {
-            confFile = IOUtils.toString(inputStream, "UTF-8");
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public String getDescriptor() {
-        return "MockChangeListener";
-    }
-
-    public String getConfFile() {
-        return confFile;
-    }
-
-    public void setConfFile(String confFile) {
-        this.confFile = confFile;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
deleted file mode 100644
index 1d9f54c..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
-
-import com.squareup.okhttp.Headers;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public abstract class TestRestChangeNotifierCommon {
-
-    private static String testString = "This is a test string.";
-
-    public static OkHttpClient client;
-    public static RestChangeNotifier restChangeNotifier;
-    public static final MediaType MEDIA_TYPE_MARKDOWN  = MediaType.parse("text/x-markdown; charset=utf-8");
-    public static String url;
-    public static MockChangeListener mockChangeListener = new MockChangeListener();
-
-    @Test
-    public void testGet() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
-        Request request = new Request.Builder()
-                .url(url)
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
-    }
-
-    @Test
-    public void testFileUpload() throws Exception {
-        assertEquals(1, restChangeNotifier.getChangeListeners().size());
-        Request request = new Request.Builder()
-                .url(url)
-                .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
-                .addHeader("charset","UTF-8")
-                .build();
-
-        Response response = client.newCall(request).execute();
-        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
-        Headers responseHeaders = response.headers();
-        for (int i = 0; i < responseHeaders.size(); i++) {
-            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
-        }
-
-        assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
-
-        assertEquals(testString, StringUtils.trim(mockChangeListener.getConfFile()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index 05f2abf..6c2ed78 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -28,7 +28,6 @@ import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
-import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer;
 import org.junit.Before;
 import org.junit.Test;
 import org.w3c.dom.Document;
@@ -96,12 +95,12 @@ public class ConfigTransformerTest {
     @Test
     public void testQueuePrioritizerWritten() throws ConfigurationChangeException, XPathExpressionException {
         Map<String, Object> map = new HashMap<>();
-        map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, FirstInFirstOutPrioritizer.class.getCanonicalName());
+        map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, "org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer");
 
         ConfigTransformer.addConnection(config, new ConnectionSchema(map), new ParentGroupIdResolver(new ProcessGroupSchema(Collections.emptyMap(), ConfigSchema.TOP_LEVEL_NAME)));
         XPath xpath = xPathFactory.newXPath();
         String expression = "connection/queuePrioritizerClass/text()";
-        assertEquals(FirstInFirstOutPrioritizer.class.getCanonicalName(), xpath.evaluate(expression, config, XPathConstants.STRING));
+        assertEquals("org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer", xpath.evaluate(expression, config, XPathConstants.STRING));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-docs/src/main/markdown/System_Admin_Guide.md
----------------------------------------------------------------------
diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md
index f672074..eaa8697 100644
--- a/minifi-docs/src/main/markdown/System_Admin_Guide.md
+++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md
@@ -21,6 +21,103 @@
 
 [MiNiFi Homepage](https://nifi.apache.org/minifi/index.html)
 
+# Automatic Warm-Redeploy
+
+When many MiNiFi agents running on the edge, it may not be possible to manually stop, edit the config.yml and then restart every one every time their configuration needs to change. The Config Change Coordinator and its Ingestors were designed to automatically redeploy in response to a configuration update.
+
+The Config Change Ingestors are the means by which the agent is notified of a potential new configuration. Currently there are three:
+
+ - FileChangeIngestor
+ - RestChangeIngestor
+ - PullHttpChangeIngestor
+
+After a new configuration has been pulled/received the Ingestors use a Differentiator in order to determine if the currently running config is different than the new config. Which Differentiator is used, is configurable for each Ingestor. Currently there is only one Differentiator:
+
+ - WholeConfigDifferentiator: Compares the entire new config with the currently running one, byte for byte.
+
+After a new config is determined to be new, the MiNiFi agent will attempt to restart. The bootstrap first saves the old config into a swap file. The bootstrap monitors the agent as it restarts and if it fails it will roll back to the old config. If it succeeds then the swap file will be deleted and the agent will start processing using the new config.
+
+Note: Data left in connections when the agent attempts to restart will either be mapped to a connection with the same ID in the new config, or orphaned and deleted.
+
+The configuration for Warm-Redeploy is done in the bootstrap.conf and primarily revolve around the Config Change Ingestors. The configuration in the bootstrap.conf is done using the "nifi.minifi.notifier.ingestors" key followed by the full path name of the desired Ingestor implementation to run. Use a comma separated list  to define more than one Ingestor implementation. For example:
+
+```
+nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
+```
+
+Ingestor specific configuration is also necessary and done in the bootstrap.conf as well. Specifics for each are detailed below.
+
+## FileChangeIngestor
+
+class name: org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
+
+This Config Change Ingestor watches a file and when the file is updated, the file is ingested as a new config.
+
+Note: The config file path configured here and in "nifi.minifi.config" cannot be the same. This is due to the swapping mechanism and other implementation limitations.
+
+Below are the configuration options. The file config path is the only required property.
+
+Option | Description
+------ | -----------
+nifi.minifi.notifier.ingestors.file.config.path | Path of the file to monitor for changes.  When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process
+nifi.minifi.notifier.ingestors.file.polling.period.seconds | How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes. If not set then a default polling period of 15 seconds will be used.
+nifi.minifi.notifier.ingestors.file.differentiator | Which differentiator to use. If not set then it uses the WholeConfigDifferentiator as a default.
+
+## RestChangeIngestor
+
+class name: org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor
+
+This Config Change Ingestor sets up a light-weight Jetty HTTP(S) REST service in order to listen to HTTP(S) requests. A potential new configuration is sent via a POST request with the BODY being the potential new config.
+
+NOTE: The encoding is expected to be Unicode and the exact version specified by the BOM mark ('UTF-8','UTF-16BE' or 'UTF-16LE'). If there is no BOM mark, then UTF-8 is used.
+
+Here is an example post request using 'curl' hitting the local machine on pot 8338 and it is executed with the config file "config.yml" in the directory the command is run from:
+
+```
+curl --request POST --data-binary "@config.yml" http://localhost:8338/
+```
+
+Below are the configuration options. There are no required options. If no properties are set then the server will bind to hostname "localhost" on a random open port, will only connect via HTTP and will use the WholeConfigDifferentiator.
+
+Option | Description
+------ | -----------
+nifi.minifi.notifier.ingestors.receive.http.host | Hostname on which the Jetty server will bind to. If not specified then it will bind to localhost.
+nifi.minifi.notifier.ingestors.receive.http.port | Port on which the Jetty server will bind to. If not specified then it will bind to a random open port.
+nifi.minifi.notifier.ingestors.receive.http.truststore.location | If using HTTPS, this specifies the location of the truststore.
+nifi.minifi.notifier.ingestors.receive.http.truststore.password | If using HTTPS, this specifies the password of the truststore.
+nifi.minifi.notifier.ingestors.receive.http.truststore.type | If using HTTPS, this specifies the type of the truststore.
+nifi.minifi.notifier.ingestors.receive.http.keystore.location | If using HTTPS, this specifies the location of the keystore.
+nifi.minifi.notifier.ingestors.receive.http.keystore.password | If using HTTPS, this specifies the password of the keystore.
+nifi.minifi.notifier.ingestors.receive.http.keystore.type | If using HTTPS, this specifies the type of the keystore.
+nifi.minifi.notifier.ingestors.receive.http.need.client.auth | If using HTTPS, this specifies whether or not to require client authentication.
+nifi.minifi.notifier.ingestors.receive.http.differentiator | Which differentiator to use. If not set then it uses the WholeConfigDifferentiator as a default.
+
+## PullHttpChangeIngestor
+
+class name: org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
+
+This Config Change Ingestor periodically sends a GET request to a REST endpoint using HTTP(S) to order to pull the potential new config.
+
+Below are the configuration options. The hostname and port are the only required properties.
+
+Option | Description
+------ | -----------
+nifi.minifi.notifier.ingestors.pull.http.hostname | Hostname on which to pull configurations from
+nifi.minifi.notifier.ingestors.pull.http.port | Port on which to pull configurations from
+nifi.minifi.notifier.ingestors.pull.http.path | Path on which to pull configurations from
+nifi.minifi.notifier.ingestors.pull.http.period.ms | Period on which to pull configurations from, defaults to 5 minutes if not set.
+nifi.minifi.notifier.ingestors.pull.http.use.etag | If the destination server is set up with cache control ability and utilizes an "ETag" header, then this should be set to true to utilize it. Very simply, the Ingestor remembers the "ETag" of the last successful pull (returned 200) then uses that "ETag" in a "If-None-Match" header on the next request.
+nifi.minifi.notifier.ingestors.pull.http.connect.timeout.ms | Sets the connect timeout for new connections. A value of 0 means no timeout, otherwise values must be a positive whole number in milliseconds.
+nifi.minifi.notifier.ingestors.pull.http.read.timeout.ms | Sets the read timeout for new connections. A value of 0 means no timeout, otherwise values must be a positive whole number in milliseconds.
+nifi.minifi.notifier.ingestors.pull.http.truststore.location | If using HTTPS, this specifies the location of the truststore.
+nifi.minifi.notifier.ingestors.pull.http.truststore.password | If using HTTPS, this specifies the password of the truststore.
+nifi.minifi.notifier.ingestors.pull.http.truststore.type | If using HTTPS, this specifies the type of the truststore.
+nifi.minifi.notifier.ingestors.pull.http.keystore.location | If using HTTPS, this specifies the location of the keystore.
+nifi.minifi.notifier.ingestors.pull.http.keystore.password | If using HTTPS, this specifies the password of the keystore.
+nifi.minifi.notifier.ingestors.pull.http.keystore.type | If using HTTPS, this specifies the type of the keystore.
+nifi.minifi.notifier.ingestors.pull.http.differentiator | Which differentiator to use. If not set then it uses the WholeConfigDifferentiator as a default.
+
+
 # Status Reporting and Querying
 
 In NiFi there is a lot of information, such as stats and bulletins, that is only available to view through the UI. MiNiFi provides access to this information through a query mechanism. You can query FlowStatus either using the MiNiFi.sh script or by configuring one of the Periodic Status Reporters. The API for the query is the same for the reporters and the "flowStatus" script option. The API is outlined in the "FlowStatus Query Options" section below.

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 7691859..2f43f5c 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -32,20 +32,30 @@ graceful.shutdown.seconds=20
 nifi.minifi.config=./conf/config.yml
 
 # Notifiers to use for the associated agent, comma separated list of class names
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier
-#nifi.minifi.notifier.components=org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier
+#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
+#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor
+#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
 
 # File change notifier configuration
 
 # Path of the file to monitor for changes.  When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process
-#nifi.minifi.notifier.file.config.path=
+#nifi.minifi.notifier.ingestors.file.config.path=
 # How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes.
-#nifi.minifi.notifier.file.polling.period.seconds=5
+#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
 
 # Rest change notifier configuration
 
 # Port on which the Jetty server will bind to, keep commented for a random open port
-#nifi.minifi.notifier.http.port=8338
+#nifi.minifi.notifier.ingestors.receive.http.port=8338
+
+#Pull HTTP change notifier configuration
+
+# Hostname on which to pull configurations from
+#nifi.minifi.notifier.ingestors.pull.http.hostname=localhost
+# Port on which to pull configurations from
+#nifi.minifi.notifier.ingestors.pull.http.port=4567
+# Period on which to pull configurations from, defaults to 5 minutes if commented out
+#nifi.minifi.notifier.ingestors.pull.http.period.ms=300000
 
 # Periodic Status Reporters to use for the associated agent, comma separated list of class names
 #nifi.minifi.status.reporter.components=org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f5cdae0..f44d6aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,11 @@ limitations under the License.
                 <artifactId>nifi-persistent-provenance-repository</artifactId>
                 <version>${org.apache.nifi.version}</version>
             </dependency>
+            <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>2.5</version>
+            </dependency>
 
             <!-- Test Dependencies -->
             <dependency>
@@ -517,15 +522,9 @@ limitations under the License.
                 <version>${org.slf4j.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.squareup.okhttp</groupId>
+                <groupId>com.squareup.okhttp3</groupId>
                 <artifactId>okhttp</artifactId>
-                <version>2.7.1</version>
-            </dependency>
-            <dependency>
-                <groupId>commons-io</groupId>
-                <artifactId>commons-io</artifactId>
-                <version>2.4</version>
-                <scope>test</scope>
+                <version>3.4.1</version>
             </dependency>
         </dependencies>
     </dependencyManagement>


[2/3] nifi-minifi git commit: MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

Posted by br...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
new file mode 100644
index 0000000..d6b61d4
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.commons.io.input.TeeInputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+
+public class RestChangeIngestor implements ChangeIngestor {
+
+    private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+    static {
+        HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
+        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+
+        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+    }
+
+
+    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
+            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
+            "Send a POST http request to '/' to upload the file.";
+    public static final String OTHER_TEXT = "This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+    private final static Logger logger = LoggerFactory.getLogger(RestChangeIngestor.class);
+    private static final String RECEIVE_HTTP_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".receive.http";
+    public static final String PORT_KEY = RECEIVE_HTTP_BASE_KEY + ".port";
+    public static final String HOST_KEY = RECEIVE_HTTP_BASE_KEY + ".host";
+    public static final String TRUSTSTORE_LOCATION_KEY = RECEIVE_HTTP_BASE_KEY + ".truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = RECEIVE_HTTP_BASE_KEY + ".truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = RECEIVE_HTTP_BASE_KEY + ".truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = RECEIVE_HTTP_BASE_KEY + ".keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = RECEIVE_HTTP_BASE_KEY + ".keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = RECEIVE_HTTP_BASE_KEY + ".keystore.type";
+    public static final String NEED_CLIENT_AUTH_KEY = RECEIVE_HTTP_BASE_KEY + ".need.client.auth";
+    public static final String DIFFERENTIATOR_KEY = RECEIVE_HTTP_BASE_KEY + ".differentiator";
+    private final Server jetty;
+
+    private volatile Differentiator<InputStream> differentiator;
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+
+    public RestChangeIngestor() {
+        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+        queuedThreadPool.setDaemon(true);
+        jetty = new Server(queuedThreadPool);
+    }
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        logger.info("Initializing");
+
+        final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+        if (differentiatorName != null && !differentiatorName.isEmpty()) {
+            Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+            if (differentiatorSupplier == null) {
+                throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+                        "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+            }
+            differentiator = differentiatorSupplier.get();
+        } else {
+            differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+        }
+        differentiator.initialize(properties, configurationFileHolder);
+
+        // create the secure connector if keystore location is specified
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            createSecureConnector(properties);
+        } else {
+            // create the unsecure connector otherwise
+            createConnector(properties);
+        }
+
+        this.configurationChangeNotifier = configurationChangeNotifier;
+
+        HandlerCollection handlerCollection = new HandlerCollection(true);
+        handlerCollection.addHandler(new JettyHandler());
+        jetty.setHandler(handlerCollection);
+    }
+
+    @Override
+    public void start() {
+        try {
+            jetty.start();
+            logger.info("RestChangeIngester has started and is listening on port {}.", new Object[]{getPort()});
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        logger.warn("Shutting down the jetty server");
+        try {
+            jetty.stop();
+            jetty.destroy();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        logger.warn("Done shutting down the jetty server");
+    }
+
+    public URI getURI() {
+        return jetty.getURI();
+    }
+
+    public int getPort() {
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+    }
+
+    private void createConnector(Properties properties) {
+        final ServerConnector http = new ServerConnector(jetty);
+
+        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
+        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed or distant environments may have significant delays when executing.
+        http.setIdleTimeout(30000L);
+        jetty.addConnector(http);
+
+        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
+    }
+
+    private void createSecureConnector(Properties properties) {
+        SslContextFactory ssl = new SslContextFactory();
+
+        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
+            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
+            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
+            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
+        }
+
+        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
+            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
+            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
+            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
+            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
+        }
+
+        // build the connector
+        final ServerConnector https = new ServerConnector(jetty, ssl);
+
+        // set host and port
+        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
+        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
+
+        // Severely taxed environments may have significant delays when executing.
+        https.setIdleTimeout(30000L);
+
+        // add the connector
+        jetty.addConnector(https);
+
+        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
+    }
+
+    protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+        this.differentiator = differentiator;
+    }
+
+    private class JettyHandler extends AbstractHandler {
+
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+                throws IOException, ServletException {
+
+            logRequest(request);
+
+            baseRequest.setHandled(true);
+
+            if (POST.equals(request.getMethod())) {
+                int statusCode;
+                String responseText;
+                try (ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
+                     TeeInputStream teeInputStream = new TeeInputStream(request.getInputStream(), pipedOutputStream)) {
+
+                    if (differentiator.isNew(teeInputStream)) {
+                        // Fill the pipedOutputStream with the rest of the request data
+                        while (teeInputStream.available() != 0) {
+                            teeInputStream.read();
+                        }
+
+                        ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
+                        ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
+
+                        Collection<ListenerHandleResult> listenerHandleResults = configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+
+                        statusCode = 200;
+                        for (ListenerHandleResult result : listenerHandleResults) {
+                            if (!result.succeeded()) {
+                                statusCode = 500;
+                                break;
+                            }
+                        }
+                        responseText = getPostText(listenerHandleResults);
+                    } else {
+                        statusCode = 409;
+                        responseText = "Request received but instance is already running this config.";
+                    }
+
+                    writeOutput(response, responseText, statusCode);
+                }
+            } else if (GET.equals(request.getMethod())) {
+                writeOutput(response, GET_TEXT, 200);
+            } else {
+                writeOutput(response, OTHER_TEXT, 404);
+            }
+        }
+
+        private String getPostText(Collection<ListenerHandleResult> listenerHandleResults) {
+            StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n");
+
+            for (ListenerHandleResult result : listenerHandleResults) {
+                postResult.append(result.toString());
+                postResult.append("\n");
+            }
+
+            return postResult.toString();
+        }
+
+        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
+            response.setStatus(responseCode);
+            response.setContentType("text/plain");
+            response.setContentLength(responseText.length());
+            try (PrintWriter writer = response.getWriter()) {
+                writer.print(responseText);
+                writer.flush();
+            }
+        }
+
+        private void logRequest(HttpServletRequest request) {
+            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
+            logger.info("request method = " + request.getMethod());
+            logger.info("request url = " + request.getRequestURL());
+            logger.info("context path = " + request.getContextPath());
+            logger.info("request content type = " + request.getContentType());
+            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/interfaces/ChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/interfaces/ChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/interfaces/ChangeIngestor.java
new file mode 100644
index 0000000..dcd39cd
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/interfaces/ChangeIngestor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public interface ChangeIngestor {
+    void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier);
+
+    void start();
+
+    void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
deleted file mode 100644
index faba2f0..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
-
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
- * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
- * ensures continuity of monitoring.
- */
-public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier {
-
-    private Path configFile;
-    private WatchService watchService;
-    private long pollingSeconds;
-
-    private final static Logger logger = LoggerFactory.getLogger(FileChangeNotifier.class);
-    private ScheduledExecutorService executorService;
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-
-    protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path";
-    protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds";
-
-    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
-    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
-    }
-
-    @Override
-    public Collection<ListenerHandleResult> notifyListeners() {
-        logger.info("Notifying Listeners of a change");
-        final File fileToRead = configFile.toFile();
-
-        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            ListenerHandleResult result;
-            try (final FileInputStream fis = new FileInputStream(fileToRead);) {
-                listener.handleChange(fis);
-                result = new ListenerHandleResult(listener);
-            } catch (IOException | ConfigurationChangeException ex) {
-                result =  new ListenerHandleResult(listener, ex);
-            }
-            listenerHandleResults.add(result);
-            logger.info("Listener notification result:" + result.toString());
-        }
-        return listenerHandleResults;
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return this.configurationChangeListeners.add(listener);
-    }
-
-    protected boolean targetChanged() {
-        boolean targetChanged = false;
-
-        final WatchKey watchKey = this.watchService.poll();
-
-        if (watchKey == null) {
-            return targetChanged;
-        }
-
-        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
-            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
-
-            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
-            final Path changedFile = pathEvent.context();
-
-            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
-            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1)));
-        }
-
-        // After completing inspection, reset for detection of subsequent change events
-        boolean valid = watchKey.reset();
-        if (!valid) {
-            throw new IllegalStateException("Unable to reinitialize file system watcher.");
-        }
-
-        return targetChanged;
-    }
-
-    protected static WatchService initializeWatcher(Path filePath) {
-        try {
-            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
-            final Path watchDirectory = filePath.getParent();
-            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
-
-            return fsWatcher;
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
-        }
-    }
-
-    @Override
-    public void run() {
-        logger.debug("Checking for a change");
-        if (targetChanged()) {
-            notifyListeners();
-        }
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
-        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
-
-        if (rawPath == null || rawPath.isEmpty()) {
-            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
-        }
-
-        try {
-            setConfigFile(Paths.get(rawPath));
-            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
-            setWatchService(initializeWatcher(configFile));
-        } catch (Exception e) {
-            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
-        }
-    }
-
-    protected void setConfigFile(Path configFile) {
-        this.configFile = configFile;
-    }
-
-    protected void setWatchService(WatchService watchService) {
-        this.watchService = watchService;
-    }
-
-    protected void setPollingPeriod(long duration, TimeUnit unit) {
-        if (duration < 0) {
-            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
-        }
-        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
-    }
-
-    @Override
-    public void start() {
-        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-            @Override
-            public Thread newThread(final Runnable r) {
-                final Thread t = Executors.defaultThreadFactory().newThread(r);
-                t.setName("File Change Notifier Thread");
-                t.setDaemon(true);
-                return t;
-            }
-        });
-        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
-    }
-
-    @Override
-    public void close() {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
deleted file mode 100644
index 777214f..0000000
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
-
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX;
-
-
-public class RestChangeNotifier implements ConfigurationChangeNotifier {
-
-    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
-    private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class);
-    private String configFile = null;
-    private final Server jetty;
-    public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" +
-            "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" +
-            "Send a POST http request to '/' to upload the file.";
-    public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n";
-
-
-    public static final String POST = "POST";
-    public static final String GET = "GET";
-
-    public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port";
-    public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host";
-    public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location";
-    public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password";
-    public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type";
-    public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location";
-    public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password";
-    public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type";
-    public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth";
-
-    public RestChangeNotifier(){
-        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
-        queuedThreadPool.setDaemon(true);
-        jetty = new Server(queuedThreadPool);
-    }
-
-    @Override
-    public void initialize(Properties properties) {
-        logger.info("Initializing");
-
-        // create the secure connector if keystore location is specified
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            createSecureConnector(properties);
-        } else {
-            // create the unsecure connector otherwise
-            createConnector(properties);
-        }
-
-        HandlerCollection handlerCollection = new HandlerCollection(true);
-        handlerCollection.addHandler(new JettyHandler());
-        jetty.setHandler(handlerCollection);
-    }
-
-    @Override
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return configurationChangeListeners;
-    }
-
-    @Override
-    public boolean registerListener(ConfigurationChangeListener listener) {
-        return configurationChangeListeners.add(listener);
-    }
-
-    @Override
-    public Collection<ListenerHandleResult> notifyListeners() {
-        if (configFile == null){
-            throw new IllegalStateException("Attempting to notify listeners when there is no new config file.");
-        }
-
-        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
-        for (final ConfigurationChangeListener listener : getChangeListeners()) {
-            ListenerHandleResult result;
-            try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes())) {
-                listener.handleChange(fis);
-                result = new ListenerHandleResult(listener);
-            } catch (IOException | ConfigurationChangeException ex) {
-                result = new ListenerHandleResult(listener, ex);
-            }
-            listenerHandleResults.add(result);
-            logger.info("Listener notification result:" + result.toString());
-        }
-
-        configFile = null;
-        return listenerHandleResults;
-    }
-
-    @Override
-    public void start(){
-        try {
-            jetty.start();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-
-    @Override
-    public void close() throws IOException {
-        logger.warn("Shutting down the jetty server");
-        try {
-            jetty.stop();
-            jetty.destroy();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        logger.warn("Done shutting down the jetty server");
-    }
-
-    public URI getURI(){
-        return jetty.getURI();
-    }
-
-    public int getPort(){
-        if (!jetty.isStarted()) {
-            throw new IllegalStateException("Jetty server not started");
-        }
-        return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
-    }
-
-    public String getConfigString(){
-        return configFile;
-    }
-
-    private void setConfigFile(String configFile){
-        this.configFile = configFile;
-    }
-
-    private void createConnector(Properties properties) {
-        final ServerConnector http = new ServerConnector(jetty);
-
-        http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0")));
-        http.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed or distant environments may have significant delays when executing.
-        http.setIdleTimeout(30000L);
-        jetty.addConnector(http);
-
-        logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()});
-    }
-
-    private void createSecureConnector(Properties properties) {
-        SslContextFactory ssl = new SslContextFactory();
-
-        if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
-            ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY));
-            ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY));
-            ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY));
-        }
-
-        if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) {
-            ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY));
-            ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY));
-            ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY));
-            ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true")));
-        }
-
-        // build the connector
-        final ServerConnector https = new ServerConnector(jetty, ssl);
-
-        // set host and port
-        https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0")));
-        https.setHost(properties.getProperty(HOST_KEY, "localhost"));
-
-        // Severely taxed environments may have significant delays when executing.
-        https.setIdleTimeout(30000L);
-
-        // add the connector
-        jetty.addConnector(https);
-
-        logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()});
-    }
-
-
-    public class JettyHandler extends AbstractHandler {
-
-        @Override
-        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
-                throws IOException, ServletException {
-
-            logRequest(request);
-
-            baseRequest.setHandled(true);
-
-            if(POST.equals(request.getMethod())) {
-                final StringBuilder configBuilder = new StringBuilder();
-                BufferedReader reader = request.getReader();
-                if(reader != null && reader.ready()){
-                    String line;
-                    while ((line = reader.readLine()) != null) {
-                        configBuilder.append(line);
-                        configBuilder.append(System.getProperty("line.separator"));
-                    }
-                }
-                setConfigFile(configBuilder.substring(0,configBuilder.length()-1));
-                Collection<ListenerHandleResult> listenerHandleResults = notifyListeners();
-
-                int statusCode = 200;
-                for (ListenerHandleResult result: listenerHandleResults){
-                    if(!result.succeeded()){
-                        statusCode = 500;
-                        break;
-                    }
-                }
-
-                writeOutput(response, getPostText(listenerHandleResults), statusCode);
-            } else if(GET.equals(request.getMethod())) {
-                writeOutput(response, GET_TEXT, 200);
-            } else {
-                writeOutput(response, OTHER_TEXT, 404);
-            }
-        }
-
-        private String getPostText(Collection<ListenerHandleResult> listenerHandleResults){
-            StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n");
-
-            for (ListenerHandleResult result : listenerHandleResults) {
-                postResult.append(result.toString());
-                postResult.append("\n");
-            }
-
-            return postResult.toString();
-        }
-
-        private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException {
-            response.setStatus(responseCode);
-            response.setContentType("text/plain");
-            response.setContentLength(responseText.length());
-            try (PrintWriter writer = response.getWriter()) {
-                writer.print(responseText);
-                writer.flush();
-            }
-        }
-
-        private void logRequest(HttpServletRequest request){
-            logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
-            logger.info("request method = " + request.getMethod());
-            logger.info("request url = " + request.getRequestURL());
-            logger.info("context path = " + request.getContextPath());
-            logger.info("request content type = " + request.getContentType());
-            logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ByteBufferInputStream.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ByteBufferInputStream.java
new file mode 100644
index 0000000..d20478e
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ByteBufferInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferInputStream extends InputStream {
+    ByteBuffer buf;
+
+    public ByteBufferInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    public int read(byte[] bytes, int off, int len)
+            throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 09776de..da707b9 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.minifi.bootstrap.util;
 
 
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
 import org.apache.nifi.minifi.commons.schema.ComponentStatusRepositorySchema;
@@ -42,7 +43,6 @@ import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
 import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
 import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
 import org.apache.nifi.minifi.commons.schema.SwapSchema;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
new file mode 100644
index 0000000..a6882a5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.mockito.Mockito.verify;
+
+public class TestConfigurationChangeCoordinator {
+
+    private ConfigurationChangeCoordinator coordinatorSpy;
+    private Properties properties = new Properties();
+
+    @Before
+    public void setUp() throws Exception {
+        coordinatorSpy = Mockito.spy(new ConfigurationChangeCoordinator());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        coordinatorSpy.close();
+    }
+
+    @Test
+    public void testInit() throws Exception {
+        properties.put("nifi.minifi.notifier.ingestors", "org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor");
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
+    }
+
+    @Test
+    public void testNotifyListeners() throws Exception {
+        final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
+
+        Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+        coordinatorSpy.notifyListeners(ByteBuffer.allocate(1));
+
+        verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+    }
+
+    @Test
+    public void testRegisterListener() throws Exception {
+        final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+        coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(firstListener));
+
+        Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+        coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, firstListener));
+        Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+        final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+        coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, secondListener));
+        Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 2);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
new file mode 100644
index 0000000..9dabea3
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
+
+import okhttp3.Request;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestWholeConfigDifferentiator {
+
+    public static final Path newConfigPath = Paths.get("./src/test/resources/config.yml");
+    public static final Path defaultConfigPath = Paths.get("./src/test/resources/default.yml");
+
+    public static ByteBuffer defaultConfigBuffer;
+    public static ByteBuffer newConfigBuffer;
+    public static Properties properties = new Properties();
+    public static ConfigurationFileHolder configurationFileHolder;
+
+    public static Request dummyRequest;
+
+    @BeforeClass
+    public static void beforeClass() throws IOException {
+        dummyRequest = new Request.Builder()
+                .get()
+                .url("https://nifi.apache.org/index.html")
+                .build();
+
+        defaultConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(defaultConfigPath.toFile()));
+        newConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(newConfigPath.toFile()));
+
+        configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+
+        when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(defaultConfigBuffer));
+    }
+
+    @Before
+    public void beforeEach() {
+    }
+
+    // InputStream differentiator methods
+
+    @Test
+    public void TestSameInputStream() throws IOException {
+        Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+        differentiator.initialize(properties, configurationFileHolder);
+
+        FileInputStream fileInputStream = new FileInputStream(defaultConfigPath.toFile());
+        assertFalse(differentiator.isNew(fileInputStream));
+    }
+
+    @Test
+    public void TestNewInputStream() throws IOException {
+        Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+        differentiator.initialize(properties, configurationFileHolder);
+
+        FileInputStream fileInputStream = new FileInputStream(newConfigPath.toFile());
+        assertTrue(differentiator.isNew(fileInputStream));
+    }
+
+    // Bytebuffer differentiator methods
+
+    @Test
+    public void TestSameByteBuffer() throws IOException {
+        Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        differentiator.initialize(properties, configurationFileHolder);
+
+        assertFalse(differentiator.isNew(defaultConfigBuffer));
+    }
+
+    @Test
+    public void TestNewByteBuffer() throws IOException {
+        Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        differentiator.initialize(properties, configurationFileHolder);
+
+        assertTrue(differentiator.isNew(newConfigBuffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
new file mode 100644
index 0000000..9817e7e
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeIngestor {
+
+    private static final String CONFIG_FILENAME = "config.yml";
+    private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+    private FileChangeIngestor notifierSpy;
+    private WatchService mockWatchService;
+    private Properties testProperties;
+    private Differentiator<InputStream> mockDifferentiator;
+    private ConfigurationChangeNotifier testNotifier;
+
+    @Before
+    public void setUp() throws Exception {
+        mockWatchService = Mockito.mock(WatchService.class);
+        notifierSpy = Mockito.spy(new FileChangeIngestor());
+        mockDifferentiator = Mockito.mock(Differentiator.class);
+        testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+        notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
+        notifierSpy.setWatchService(mockWatchService);
+        notifierSpy.setDifferentiator(mockDifferentiator);
+        notifierSpy.setConfigurationChangeNotifier(testNotifier);
+
+        testProperties = new Properties();
+        testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+        testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, FileChangeIngestor.DEFAULT_POLLING_PERIOD_INTERVAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        notifierSpy.close();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidFile() throws Exception {
+        testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+        notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+    }
+
+    @Test
+    public void testInitialize_validFile() throws Exception {
+        notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInitialize_invalidPollingPeriod() throws Exception {
+        testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, "abc");
+        notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+    }
+
+    @Test
+    public void testInitialize_useDefaultPolling() throws Exception {
+        testProperties.remove(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY);
+        notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+    }
+
+    /* Verify handleChange events */
+    @Test
+    public void testTargetChangedNoModification() throws Exception {
+        when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+        final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+        // In this case the WatchKey is null because there were no events found
+        establishMockEnvironmentForChangeTests(testNotifier, null);
+
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+        when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+        final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+        // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+        final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+        establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
+
+        notifierSpy.targetChanged();
+
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+    }
+
+    @Test
+    public void testTargetChangedWithModificationEvent() throws Exception {
+        when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+
+        final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+        // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+        establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
+
+        // Invoke the method of interest
+        notifierSpy.run();
+
+        verify(mockWatchService, Mockito.atLeastOnce()).poll();
+        verify(testNotifier, Mockito.atLeastOnce()).notifyListeners(Mockito.any(ByteBuffer.class));
+    }
+
+    /* Helper methods to establish mock environment */
+    private WatchKey createMockWatchKeyForPath(String configFilePath) {
+        final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+        final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+        when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+        when(mockWatchKey.reset()).thenReturn(true);
+
+        final Iterator mockIterator = Mockito.mock(Iterator.class);
+        when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+        final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+        // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+        when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+        when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+        return mockWatchKey;
+    }
+
+    private void establishMockEnvironmentForChangeTests(ConfigurationChangeNotifier configurationChangeNotifier, final WatchKey watchKey) throws Exception {
+        // Establish the file mock and its parent directory
+        final Path mockConfigFilePath = Mockito.mock(Path.class);
+        final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+        // When getting the parent of the file, get the directory
+        when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+        when(mockWatchService.poll()).thenReturn(watchKey);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
new file mode 100644
index 0000000..f5c6806
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestPullHttpChangeIngestorCommon;
+import org.eclipse.jetty.server.ServerConnector;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+public class TestPullHttpChangeIngestor extends TestPullHttpChangeIngestorCommon {
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestPullHttpChangeIngestorCommon.init();
+
+        final ServerConnector http = new ServerConnector(jetty);
+
+        http.setPort(0);
+        http.setHost("localhost");
+
+        http.setIdleTimeout(3000L);
+        jetty.addConnector(http);
+
+        jetty.start();
+
+        Thread.sleep(1000);
+
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+    }
+
+
+    @Override
+    public void pullHttpChangeIngestorInit(Properties properties) {
+        port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+        properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
+        properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
+        properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");
+
+        pullHttpChangeIngestor = new PullHttpChangeIngestor();
+
+
+        pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+        pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
new file mode 100644
index 0000000..340a131
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestPullHttpChangeIngestorCommon;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+public class TestPullHttpChangeIngestorSSL extends TestPullHttpChangeIngestorCommon {
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestPullHttpChangeIngestorCommon.init();
+
+        SslContextFactory ssl = new SslContextFactory();
+
+        ssl.setKeyStorePath("./src/test/resources/localhost-ks.jks");
+        ssl.setKeyStorePassword("localtest");
+        ssl.setKeyStoreType("JKS");
+        ssl.setTrustStorePath("./src/test/resources/localhost-ts.jks");
+        ssl.setTrustStorePassword("localtest");
+        ssl.setTrustStoreType("JKS");
+        ssl.setNeedClientAuth(true);
+
+        // build the connector
+        final ServerConnector https = new ServerConnector(jetty, ssl);
+
+        // set host and port
+        https.setPort(0);
+        https.setHost("localhost");
+
+        // Severely taxed environments may have significant delays when executing.
+        https.setIdleTimeout(30000L);
+
+        // add the connector
+        jetty.addConnector(https);
+
+        jetty.start();
+
+        Thread.sleep(1000);
+
+        if (!jetty.isStarted()) {
+            throw new IllegalStateException("Jetty server not started");
+        }
+    }
+
+    @Override
+    public void pullHttpChangeIngestorInit(Properties properties) {
+        properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+        properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(PullHttpChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+        properties.setProperty(PullHttpChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(PullHttpChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
+        port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+        properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
+        properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
+
+        pullHttpChangeIngestor = new PullHttpChangeIngestor();
+
+        pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+        pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
new file mode 100644
index 0000000..86a6b4e
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+
+import okhttp3.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestRestChangeIngestorCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class TestRestChangeIngestor extends TestRestChangeIngestorCommon {
+
+    @BeforeClass
+    public static void setUp() throws InterruptedException, MalformedURLException {
+        Properties properties = new Properties();
+        restChangeIngestor = new RestChangeIngestor();
+
+        testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+        restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+        restChangeIngestor.setDifferentiator(mockDifferentiator);
+        restChangeIngestor.start();
+
+        client = new OkHttpClient();
+
+        url = restChangeIngestor.getURI().toURL().toString();
+        Thread.sleep(1000);
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeIngestor.close();
+        client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
new file mode 100644
index 0000000..debe772
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+
+import okhttp3.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestRestChangeIngestorCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.mockito.Mockito.when;
+
+
+public class TestRestChangeIngestorSSL extends TestRestChangeIngestorCommon {
+
+
+    @BeforeClass
+    public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+        Properties properties = new Properties();
+        properties.setProperty(RestChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+        properties.setProperty(RestChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+        properties.setProperty(RestChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
+        properties.setProperty(RestChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
+        properties.setProperty(RestChangeIngestor.NEED_CLIENT_AUTH_KEY, "false");
+
+        restChangeIngestor = new RestChangeIngestor();
+
+        testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+        ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+        when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+
+        restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+        restChangeIngestor.setDifferentiator(mockDifferentiator);
+        restChangeIngestor.start();
+
+        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
+
+        final String keystoreLocation = "./src/test/resources/localhost-ks.jks";
+        final String keystorePass = "localtest";
+        final String keystoreType = "JKS";
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+        try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+            keyStore.load(keyStoreStream, keystorePass.toCharArray());
+        }
+
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+        // load truststore
+        final String truststoreLocation = "./src/test/resources/localhost-ts.jks";
+        final String truststorePass = "localtest";
+        final String truststoreType = "JKS";
+
+        KeyStore truststore = KeyStore.getInstance(truststoreType);
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+        truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+        trustManagerFactory.init(truststore);
+
+        final X509TrustManager x509TrustManager;
+        TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+        if (trustManagers[0] != null) {
+            x509TrustManager = (X509TrustManager) trustManagers[0];
+        } else {
+            throw new IllegalStateException("List of trust managers is null");
+        }
+
+        SSLContext tempSslContext;
+        try {
+            tempSslContext = SSLContext.getInstance("TLS");
+        } catch (NoSuchAlgorithmException e) {
+            tempSslContext = SSLContext.getDefault();
+        }
+
+        final SSLContext sslContext = tempSslContext;
+        sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+        final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+        clientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+
+        Thread.sleep(1000);
+        url = restChangeIngestor.getURI().toURL().toString();
+        client = clientBuilder.build();
+    }
+
+    @AfterClass
+    public static void stop() throws Exception {
+        restChangeIngestor.close();
+        client = null;
+    }
+
+    private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+        KeyStore ks = KeyStore.getInstance("jks");
+
+        char[] password = "localtest".toCharArray();
+
+        java.io.FileInputStream fis = null;
+        try {
+            fis = new java.io.FileInputStream(path);
+            ks.load(fis, password);
+        } finally {
+            if (fis != null) {
+                fis.close();
+            }
+        }
+        return ks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
new file mode 100644
index 0000000..53e66d7
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public abstract class TestPullHttpChangeIngestorCommon {
+
+    public static volatile Server jetty;
+    public static volatile int port;
+    public static volatile PullHttpChangeIngestor pullHttpChangeIngestor;
+    public static ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+    public static Differentiator<ByteBuffer> mockDifferentiator = Mockito.mock(Differentiator.class);
+    public static final String RESPONSE_STRING = "test";
+    public static final String PATH_RESPONSE_STRING = "path";
+    public static ByteBuffer configBuffer= ByteBuffer.wrap(RESPONSE_STRING.getBytes());
+    public static ByteBuffer pathConfigBuffer= ByteBuffer.wrap(PATH_RESPONSE_STRING.getBytes());
+    public static final String ETAG = "testEtag";
+    public static final String QUOTED_ETAG = "\"testEtag\"";
+
+    public static void init() {
+        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+        queuedThreadPool.setDaemon(true);
+        jetty = new Server(queuedThreadPool);
+
+        HandlerCollection handlerCollection = new HandlerCollection(true);
+        handlerCollection.addHandler(new JettyHandler(RESPONSE_STRING, PATH_RESPONSE_STRING));
+        jetty.setHandler(handlerCollection);
+    }
+
+    public abstract void pullHttpChangeIngestorInit(Properties properties);
+
+    @Before
+    public void before() {
+        Mockito.reset(testNotifier);
+        ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+        when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+        Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+    }
+
+    @AfterClass
+    public static void shutdown() throws Exception {
+        jetty.stop();
+    }
+
+    @Test
+    public void testNewUpdate() throws IOException {
+        Properties properties = new Properties();
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setUseEtag(false);
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer()));
+    }
+
+
+    @Test
+    public void testNoUpdate() throws IOException {
+        Properties properties = new Properties();
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setUseEtag(false);
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+    }
+
+    @Test
+    public void testUseEtag() throws IOException {
+        Properties properties = new Properties();
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setLastEtag("");
+
+        pullHttpChangeIngestor.setUseEtag(true);
+
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer));
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
+
+    }
+
+    @Test
+    public void testNewUpdateWithPath() throws IOException {
+        Properties properties = new Properties();
+        properties.put(PATH_KEY, "/config.yml");
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setUseEtag(false);
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
+    }
+
+    @Test
+    public void testNoUpdateWithPath() throws IOException {
+        Properties properties = new Properties();
+        properties.put(PATH_KEY, "/config.yml");
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setUseEtag(false);
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+    }
+
+    @Test
+    public void testUseEtagWithPath() throws IOException {
+        Properties properties = new Properties();
+        properties.put(PATH_KEY, "/config.yml");
+        pullHttpChangeIngestorInit(properties);
+        pullHttpChangeIngestor.setLastEtag("");
+
+        pullHttpChangeIngestor.setUseEtag(true);
+
+        when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
+
+        pullHttpChangeIngestor.run();
+
+        verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
+
+    }
+
+    static class JettyHandler extends AbstractHandler {
+        volatile String configResponse;
+        volatile String pathResponse;
+
+        public JettyHandler(String configResponse, String pathResponse){
+            this.configResponse = configResponse;
+            this.pathResponse = pathResponse;
+        }
+
+
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+                throws IOException, ServletException {
+
+            baseRequest.setHandled(true);
+
+            if ("GET".equals(request.getMethod())) {
+
+                if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){
+                    writeOutput(response, null, 304);
+                } else {
+
+                    if ("/config.yml".equals(baseRequest.getPathInfo())) {
+                        writeOutput(response, pathResponse, 200);
+                    } else {
+                        writeOutput(response, configResponse, 200);
+                    }
+                }
+
+
+            } else {
+                writeOutput(response, "not a GET request", 404);
+            }
+        }
+
+        private void writeOutput(HttpServletResponse response, String responseBuffer, int responseCode) throws IOException {
+            response.setStatus(responseCode);
+            response.setHeader("ETag", ETAG);
+            if (responseBuffer != null) {
+                response.setContentType("text/plain");
+                response.setContentLength(responseBuffer.length());
+                response.setCharacterEncoding(StandardCharsets.UTF_8.displayName());
+                try (PrintWriter writer = response.getWriter()) {
+                    writer.print(responseBuffer);
+                    writer.flush();
+                }
+            }
+        }
+
+    }
+}


[3/3] nifi-minifi git commit: MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

Posted by br...@apache.org.
MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

This closes #51

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/6f3c5678
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/6f3c5678
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/6f3c5678

Branch: refs/heads/master
Commit: 6f3c567806363f58e85a2f00ef83e3b1f48f3f7f
Parents: 7954d36
Author: Joseph Percivall <JP...@apache.org>
Authored: Thu Oct 6 16:27:19 2016 -0400
Committer: Bryan Rosander <br...@apache.org>
Committed: Fri Nov 11 12:14:57 2016 -0500

----------------------------------------------------------------------
 .../src/main/assembly/dependencies.xml          |   2 +
 minifi-bootstrap/pom.xml                        |  10 +-
 .../nifi/minifi/bootstrap/BootstrapCodec.java   |   2 +-
 .../bootstrap/ConfigurationFileHolder.java      |  26 ++
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 170 ++++++----
 .../nifi/minifi/bootstrap/ShutdownHook.java     |  15 +-
 .../ConfigurationChangeCoordinator.java         | 114 +++++++
 .../ConfigurationChangeListener.java            |   4 +-
 .../ConfigurationChangeNotifier.java            |  44 +--
 .../configuration/ListenerHandleResult.java     |  14 +-
 .../WholeConfigDifferentiator.java              |  90 +++++
 .../interfaces/Differentiator.java              |  29 ++
 .../ingestors/AbstractPullChangeIngestor.java   |  60 ++++
 .../ingestors/FileChangeIngestor.java           | 234 +++++++++++++
 .../ingestors/PullHttpChangeIngestor.java       | 326 +++++++++++++++++++
 .../ingestors/RestChangeIngestor.java           | 294 +++++++++++++++++
 .../ingestors/interfaces/ChangeIngestor.java    |  32 ++
 .../notifiers/FileChangeNotifier.java           | 202 ------------
 .../notifiers/RestChangeNotifier.java           | 289 ----------------
 .../bootstrap/util/ByteBufferInputStream.java   |  48 +++
 .../bootstrap/util/ConfigTransformer.java       |   2 +-
 .../TestConfigurationChangeCoordinator.java     |  84 +++++
 .../TestWholeConfigDifferentiator.java          | 110 +++++++
 .../ingestors/TestFileChangeIngestor.java       | 171 ++++++++++
 .../ingestors/TestPullHttpChangeIngestor.java   |  65 ++++
 .../TestPullHttpChangeIngestorSSL.java          |  84 +++++
 .../ingestors/TestRestChangeIngestor.java       |  57 ++++
 .../ingestors/TestRestChangeIngestorSSL.java    | 150 +++++++++
 .../TestPullHttpChangeIngestorCommon.java       | 231 +++++++++++++
 .../common/TestRestChangeIngestorCommon.java    | 127 ++++++++
 .../notifiers/TestFileChangeNotifier.java       | 208 ------------
 .../notifiers/TestRestChangeNotifier.java       |  51 ---
 .../notifiers/TestRestChangeNotifierSSL.java    |  96 ------
 .../notifiers/util/MockChangeListener.java      |  51 ---
 .../util/TestRestChangeNotifierCommon.java      |  84 -----
 .../bootstrap/util/ConfigTransformerTest.java   |   5 +-
 .../src/main/markdown/System_Admin_Guide.md     |  97 ++++++
 .../src/main/resources/conf/bootstrap.conf      |  20 +-
 pom.xml                                         |  15 +-
 39 files changed, 2580 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-assembly/src/main/assembly/dependencies.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/src/main/assembly/dependencies.xml b/minifi-assembly/src/main/assembly/dependencies.xml
index 551c8af..a774e49 100644
--- a/minifi-assembly/src/main/assembly/dependencies.xml
+++ b/minifi-assembly/src/main/assembly/dependencies.xml
@@ -73,6 +73,8 @@
                 <include>jetty-http</include>
                 <include>jetty-io</include>
                 <include>javax.servlet-api</include>
+                <include>commons-io</include>
+                <include>okhttp</include>
             </includes>
         </dependencySet>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/pom.xml b/minifi-bootstrap/pom.xml
index 433c352..71e9b78 100644
--- a/minifi-bootstrap/pom.xml
+++ b/minifi-bootstrap/pom.xml
@@ -73,21 +73,13 @@ limitations under the License.
             <version>${jetty.version}</version>
             <scope>compile</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-prioritizers</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
-            <groupId>com.squareup.okhttp</groupId>
+            <groupId>com.squareup.okhttp3</groupId>
             <artifactId>okhttp</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
index 95e6f87..2e8a537 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
@@ -110,7 +110,7 @@ public class BootstrapCodec {
             break;
             case "SHUTDOWN": {
                 logger.debug("Received 'SHUTDOWN' command from MINIFI");
-                runner.shutdownChangeNotifiers();
+                runner.shutdownChangeNotifier();
                 runner.shutdownPeriodicStatusReporters();
                 writer.write("OK");
                 writer.newLine();

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
new file mode 100644
index 0000000..d5113e3
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface ConfigurationFileHolder {
+
+    AtomicReference<ByteBuffer> getConfigFileReference();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index ad54c61..52a803c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -57,14 +58,16 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
 import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
 import org.apache.nifi.minifi.commons.status.FlowStatusReport;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
@@ -90,7 +93,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
  * <p>
  * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException}.
  */
-public class RunMiNiFi implements QueryableStatusAggregator {
+public class RunMiNiFi implements QueryableStatusAggregator, ConfigurationFileHolder {
 
     public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
     public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
@@ -143,11 +146,18 @@ public class RunMiNiFi implements QueryableStatusAggregator {
     private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
     private volatile int gracefulShutdownSeconds;
 
-    private Set<ConfigurationChangeNotifier> changeNotifiers;
     private Set<PeriodicStatusReporter> periodicStatusReporters;
 
+    private ConfigurationChangeCoordinator changeCoordinator;
     private MiNiFiConfigurationChangeListener changeListener;
 
+    private final AtomicReference<ByteBuffer> currentConfigFileReference = new AtomicReference<>();
+
+    @Override
+    public AtomicReference<ByteBuffer> getConfigFileReference() {
+        return currentConfigFileReference;
+    }
+
     // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
     private AtomicBoolean reloading = new AtomicBoolean(false);
 
@@ -1098,8 +1108,9 @@ public class RunMiNiFi implements QueryableStatusAggregator {
 
         final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
         final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
-        try {
-            performTransformation(new FileInputStream(configFile), confDir);
+        try (InputStream inputStream = new FileInputStream(configFile)) {
+            ByteBuffer tempConfigFile = performTransformation(inputStream, confDir);
+            currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
         } catch (ConfigurationChangeException e) {
             defaultLogger.error("The config file is malformed, unable to start.", e);
             return;
@@ -1111,11 +1122,11 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             return;
         }
 
-        // Instantiate configuration listener and configured notifiers
+        // Instantiate configuration listener and configured ingestors
         this.changeListener = new MiNiFiConfigurationChangeListener(this, defaultLogger);
-        this.changeNotifiers = initializeNotifiers(this.changeListener);
         this.periodicStatusReporters = initializePeriodicNotifiers();
         startPeriodicNotifiers();
+        this.changeCoordinator = initializeNotifier(this.changeListener);
 
         ProcessBuilder builder = tuple.getKey();
         Process process = tuple.getValue();
@@ -1136,7 +1147,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                                 if (swapConfigFile.delete()) {
                                     defaultLogger.info("Swap file was successfully deleted.");
                                 } else {
-                                    defaultLogger.info("Swap file was not deleted.");
+                                    defaultLogger.error("Swap file was not deleted. It should be deleted manually.");
                                 }
                             }
 
@@ -1180,7 +1191,8 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                                 defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
 
                                 try {
-                                    performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                    ByteBuffer tempConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                    currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
                                 } catch (ConfigurationChangeException e) {
                                     defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
                                     return;
@@ -1228,7 +1240,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                 }
             }
         } finally {
-            shutdownChangeNotifiers();
+            shutdownChangeNotifier();
             shutdownPeriodicStatusReporters();
         }
     }
@@ -1424,41 +1436,26 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         }
     }
 
-    public void shutdownChangeNotifiers() {
-        for (ConfigurationChangeNotifier notifier : getChangeNotifiers()) {
-            try {
-                notifier.close();
-            } catch (IOException e) {
-                defaultLogger.warn("Could not successfully stop notifier {}", notifier.getClass(), e);
-            }
+    public void shutdownChangeNotifier() {
+        try {
+            getChangeCoordinator().close();
+        } catch (IOException e) {
+            defaultLogger.warn("Could not successfully stop notifier ", e);
         }
     }
 
-    public Set<ConfigurationChangeNotifier> getChangeNotifiers() {
-        return Collections.unmodifiableSet(changeNotifiers);
+    public ConfigurationChangeCoordinator getChangeCoordinator() {
+        return changeCoordinator;
     }
 
-    private Set<ConfigurationChangeNotifier> initializeNotifiers(ConfigurationChangeListener configChangeListener) throws IOException {
-        final Set<ConfigurationChangeNotifier> changeNotifiers = new HashSet<>();
-
+    private ConfigurationChangeCoordinator initializeNotifier(ConfigurationChangeListener configChangeListener) throws IOException {
         final Properties bootstrapProperties = getBootstrapProperties();
 
-        final String notifiersCsv = bootstrapProperties.getProperty(NOTIFIER_COMPONENTS_KEY);
-        if (notifiersCsv != null && !notifiersCsv.isEmpty()) {
-            for (String notifierClassname : Arrays.asList(notifiersCsv.split(","))) {
-                try {
-                    Class<?> notifierClass = Class.forName(notifierClassname);
-                    ConfigurationChangeNotifier notifier = (ConfigurationChangeNotifier) notifierClass.newInstance();
-                    notifier.initialize(bootstrapProperties);
-                    changeNotifiers.add(notifier);
-                    notifier.registerListener(configChangeListener);
-                    notifier.start();
-                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
-                    throw new RuntimeException("Issue instantiating notifier " + notifierClassname, e);
-                }
-            }
-        }
-        return changeNotifiers;
+        ConfigurationChangeCoordinator notifier = new ConfigurationChangeCoordinator();
+        notifier.initialize(bootstrapProperties, this, Collections.singleton(configChangeListener));
+        notifier.start();
+
+        return notifier;
     }
 
     public Set<PeriodicStatusReporter> getPeriodicStatusReporters() {
@@ -1506,6 +1503,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
 
         private final RunMiNiFi runner;
         private final Logger logger;
+        private static final ReentrantLock handlingLock = new ReentrantLock();
 
         public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger) {
             this.runner = runner;
@@ -1515,7 +1513,12 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         @Override
         public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
             logger.info("Received notification of a change");
+
+            if (!handlingLock.tryLock()) {
+                throw new ConfigurationChangeException("Instance is already handling another change");
+            }
             try {
+
                 final Properties bootstrapProperties = runner.getBootstrapProperties();
                 final File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
 
@@ -1528,38 +1531,50 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                 }
 
                 // Create an input stream to use for writing a config file as well as feeding to the config transformer
-                final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
-                newConfigBais.mark(-1);
+                try (final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray())) {
+                    newConfigBais.mark(-1);
 
-                final File swapConfigFile = runner.getSwapFile(logger);
-                logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
-                Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
+                    final File swapConfigFile = runner.getSwapFile(logger);
+                    logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
 
-                try {
-                    logger.info("Persisting changes to {}", configFile.getAbsolutePath());
-                    saveFile(newConfigBais, configFile);
-
-                     try {
-                         // Reset the input stream to provide to the transformer
-                         newConfigBais.reset();
-
-                         final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                         logger.info("Performing transformation for input and saving outputs to {}", confDir);
-                         performTransformation(newConfigBais, confDir);
-
-                         logger.info("Reloading instance with new configuration");
-                         restartInstance();
-                     } catch (Exception e){
-                         logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
-                         Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
-                         throw e;
-                     }
-                } catch (Exception e){
-                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
-                    if(!swapConfigFile.delete()){
-                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                    try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
+                        Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
+                    }
+
+                    try {
+                        logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+                        saveFile(newConfigBais, configFile);
+                        final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+                        try {
+                            // Reset the input stream to provide to the transformer
+                            newConfigBais.reset();
+
+                            logger.info("Performing transformation for input and saving outputs to {}", confDir);
+                            ByteBuffer tempConfigFile = performTransformation(newConfigBais, confDir);
+                            runner.currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
+
+                            try {
+                                logger.info("Reloading instance with new configuration");
+                                restartInstance();
+                            } catch (Exception e) {
+                                logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting.");
+                                ByteBuffer resetConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                runner.currentConfigFileReference.set(resetConfigFile.asReadOnlyBuffer());
+                                throw e;
+                            }
+                        } catch (Exception e) {
+                            logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+                            Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+                            throw e;
+                        }
+                    } catch (Exception e) {
+                        logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+                        if (!swapConfigFile.delete()) {
+                            logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                        }
+                        throw e;
                     }
-                    throw e;
                 }
             } catch (ConfigurationChangeException e){
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
@@ -1567,6 +1582,15 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             } catch (IOException ioe) {
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
                 throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
+            } finally {
+                try {
+                    if (configInputStream != null) {
+                        configInputStream.close() ;
+                    }
+                } catch (IOException e) {
+                    // Quietly close
+                }
+                handlingLock.unlock();
             }
         }
 
@@ -1589,8 +1613,6 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             }
         }
 
-
-
         private void restartInstance() throws IOException {
             try {
                 runner.reload();
@@ -1600,9 +1622,13 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         }
     }
 
-    private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
-        try {
-            ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+    private static ByteBuffer performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+                TeeInputStream teeInputStream = new TeeInputStream(configIs, byteArrayOutputStream)) {
+
+            ConfigTransformer.transformConfigFile(teeInputStream, configDestinationPath);
+
+            return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
         } catch (ConfigurationChangeException e){
             throw e;
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index bec39e6..236a52d 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -25,8 +25,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
 
 public class ShutdownHook extends Thread {
 
@@ -53,13 +53,12 @@ public class ShutdownHook extends Thread {
     public void run() {
         executor.shutdown();
 
-        System.out.println("Initiating shutdown of bootstrap change notifiers...");
-        for (ConfigurationChangeNotifier notifier : runner.getChangeNotifiers()) {
-            try {
-                notifier.close();
-            } catch (IOException ioe) {
-                System.out.println("Could not successfully stop notifier " + notifier.getClass() + " due to " + ioe);
-            }
+        System.out.println("Initiating shutdown of bootstrap change ingestors...");
+        ConfigurationChangeCoordinator notifier = runner.getChangeCoordinator();
+        try {
+            notifier.close();
+        } catch (IOException ioe) {
+            System.out.println("Could not successfully stop notifier due to " + ioe);
         }
 
         System.out.println("Initiating shutdown of bootstrap periodic status reporters...");

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
new file mode 100644
index 0000000..3fa5b8f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
+
+    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
+    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
+    private final static Logger logger = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+
+    /**
+     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
+     *
+     * @param properties from the bootstrap configuration
+     */
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, Collection<ConfigurationChangeListener> changeListenerSet) {
+        final String ingestorsCsv = properties.getProperty(NOTIFIER_INGESTORS_KEY);
+
+        if (ingestorsCsv != null && !ingestorsCsv.isEmpty()) {
+            for (String ingestorClassname : Arrays.asList(ingestorsCsv.split(","))) {
+                ingestorClassname = ingestorClassname.trim();
+                try {
+                    Class<?> ingestorClass = Class.forName(ingestorClassname);
+                    ChangeIngestor changeIngestor = (ChangeIngestor) ingestorClass.newInstance();
+                    changeIngestor.initialize(properties, configurationFileHolder, this);
+                    changeIngestors.add(changeIngestor);
+                    logger.info("Initialized ");
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new RuntimeException("Issue instantiating ingestor " + ingestorClassname, e);
+                }
+            }
+        }
+        configurationChangeListeners.clear();
+        configurationChangeListeners.addAll(changeListenerSet);
+    }
+
+    /**
+     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
+     */
+    public void start() {
+        changeIngestors.forEach(ChangeIngestor::start);
+    }
+
+    /**
+     * Provides an immutable collection of listeners for the notifier instance
+     *
+     * @return a collection of those listeners registered for notifications
+     */
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return Collections.unmodifiableSet(configurationChangeListeners);
+    }
+
+    /**
+     * Provide the mechanism by which listeners are notified
+     */
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) {
+        logger.info("Notifying Listeners of a change");
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try {
+                listener.handleChange(new ByteBufferInputStream(newConfig.duplicate()));
+                result = new ListenerHandleResult(listener);
+            } catch (ConfigurationChangeException ex) {
+                result = new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+        return listenerHandleResults;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        for (ChangeIngestor changeIngestor : changeIngestors) {
+            changeIngestor.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 756b051..642ed4b 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -19,8 +19,8 @@ package org.apache.nifi.minifi.bootstrap.configuration;
 import java.io.InputStream;
 
 /**
- * Interface for handling events detected and driven by an associated {@link ConfigurationChangeNotifier} to which the listener
- * has registered via {@link ConfigurationChangeNotifier#registerListener(ConfigurationChangeListener)}.
+ * Interface for handling events detected and driven by an associated {@link ConfigurationChangeCoordinator} to which the listener
+ * has registered via {@link ConfigurationChangeCoordinator#registerListener(ConfigurationChangeListener)}.
  */
 public interface ConfigurationChangeListener {
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 745ce6c..2ebced5 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -1,57 +1,29 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.minifi.bootstrap.configuration;
 
-import java.io.Closeable;
+import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Properties;
 import java.util.Set;
 
-public interface ConfigurationChangeNotifier extends Closeable {
-
-    /**
-     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
-     *
-     * @param properties from the bootstrap configuration
-     */
-    void initialize(Properties properties);
+public interface ConfigurationChangeNotifier {
 
-    /**
-     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
-     */
-    void start();
-
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
     Set<ConfigurationChangeListener> getChangeListeners();
 
-    /**
-     * Adds a listener to be notified of configuration changes
-     *
-     * @param listener to be added to the collection
-     * @return true if the listener was added; false if already registered
-     */
-    boolean registerListener(ConfigurationChangeListener listener);
-
-    /**
-     * Provide the mechanism by which listeners are notified
-     */
-    Collection<ListenerHandleResult> notifyListeners();
+    Collection<ListenerHandleResult> notifyListeners(ByteBuffer is);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
index 8ac4cea..c0a7e74 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -22,34 +22,34 @@ public class ListenerHandleResult {
     private final ConfigurationChangeListener configurationChangeListener;
     private final Exception failureCause;
 
-    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener) {
         this.configurationChangeListener = configurationChangeListener;
         failureCause = null;
     }
 
-    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause) {
         this.configurationChangeListener = configurationChangeListener;
         this.failureCause = failureCause;
     }
 
-    public boolean succeeded(){
+    public boolean succeeded() {
         return failureCause == null;
     }
 
-    public String getDescriptor(){
+    public String getDescriptor() {
         return configurationChangeListener.getDescriptor();
     }
 
-    public Exception getFailureCause(){
+    public Exception getFailureCause() {
         return failureCause;
     }
 
     @Override
     public String toString() {
-        if(failureCause == null){
+        if (failureCause == null) {
             return getDescriptor() + " successfully handled the configuration change";
         } else {
-            return getDescriptor() + " FAILED to handle the configuration change due to: '"  + failureCause.getMessage() + "'";
+            return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
new file mode 100644
index 0000000..565a8f4
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class WholeConfigDifferentiator {
+
+
+    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
+
+    public static final String WHOLE_CONFIG_KEY = "Whole Config";
+
+    volatile ConfigurationFileHolder configurationFileHolder;
+
+    boolean compareInputStreamToConfigFile(InputStream inputStream) throws IOException {
+        logger.debug("Checking if change is different");
+        AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+        ByteBuffer currentConfigFile = currentConfigFileReference.get();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
+        DataInputStream dataInputStream = new DataInputStream(inputStream);
+        try {
+            dataInputStream.readFully(byteBuffer.array());
+        } catch (EOFException e) {
+            logger.debug("New config is shorter than the current. Must be different.");
+            return true;
+        }
+        logger.debug("Read the input");
+
+        if (dataInputStream.available() != 0) {
+            return true;
+        } else {
+            return byteBuffer.compareTo(currentConfigFile) != 0;
+        }
+    }
+
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) {
+        this.configurationFileHolder = configurationFileHolder;
+    }
+
+
+    public static class InputStreamInput extends WholeConfigDifferentiator implements Differentiator<InputStream> {
+        public boolean isNew(InputStream inputStream) throws IOException {
+            return compareInputStreamToConfigFile(inputStream);
+        }
+    }
+
+    public static class ByteBufferInput extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+        public boolean isNew(ByteBuffer inputBuffer) {
+            AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+            ByteBuffer currentConfigFile = currentConfigFileReference.get();
+            return inputBuffer.compareTo(currentConfigFile) != 0;
+        }
+    }
+
+
+    public static Differentiator<InputStream> getInputStreamDifferentiator() {
+        return new InputStreamInput();
+    }
+
+    public static Differentiator<ByteBuffer> getByteBufferDifferentiator() {
+        return new ByteBufferInput();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
new file mode 100644
index 0000000..5beb78b
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public interface Differentiator <T> {
+    void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+
+    boolean isNew(T input) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
new file mode 100644
index 0000000..1678f20
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+
+public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {
+
+
+    // 5 minute default pulling period
+    protected static final String DEFAULT_POLLING_PERIOD = "300000";
+    protected static Logger logger;
+
+    protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
+    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+    protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        this.configurationChangeNotifier = configurationChangeNotifier;
+    }
+
+    @Override
+    public void start() {
+        scheduledThreadPoolExecutor.scheduleAtFixedRate(this, pollingPeriodMS.get(), pollingPeriodMS.get(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws IOException {
+        scheduledThreadPoolExecutor.shutdownNow();
+    }
+
+    public abstract void run();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
new file mode 100644
index 0000000..39b272d
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.commons.io.input.TeeInputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+/**
+ * FileChangeIngestor provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeIngestor implements Runnable, ChangeIngestor {
+
+    private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+    static {
+        HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
+        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+
+        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+    }
+
+
+    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
+    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+
+    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+
+    private Path configFilePath;
+    private WatchService watchService;
+    private long pollingSeconds;
+    private volatile Differentiator<InputStream> differentiator;
+
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+    private ScheduledExecutorService executorService;
+
+    protected static WatchService initializeWatcher(Path filePath) {
+        try {
+            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+            final Path watchDirectory = filePath.getParent();
+            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+            return fsWatcher;
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        }
+    }
+
+    protected boolean targetChanged() {
+        boolean targetChanged = false;
+
+        final WatchKey watchKey = this.watchService.poll();
+
+        if (watchKey == null) {
+            return targetChanged;
+        }
+
+        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+            final Path changedFile = pathEvent.context();
+
+            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFilePath.getName(configFilePath.getNameCount() - 1)));
+        }
+
+        // After completing inspection, reset for detection of subsequent change events
+        boolean valid = watchKey.reset();
+        if (!valid) {
+            throw new IllegalStateException("Unable to reinitialize file system watcher.");
+        }
+
+        return targetChanged;
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change");
+        if (targetChanged()) {
+            logger.debug("Target changed, checking if it's different than current flow.");
+            try (FileInputStream configFile = new FileInputStream(configFilePath.toFile());
+                ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
+                TeeInputStream teeInputStream = new TeeInputStream(configFile, pipedOutputStream)) {
+
+                if (differentiator.isNew(teeInputStream)) {
+                    logger.debug("New change, notifying listener");
+                    // Fill the byteArrayOutputStream with the rest of the request data
+                    while (teeInputStream.available() != 0) {
+                        teeInputStream.read();
+                    }
+
+                    ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
+                    ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
+
+                    configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+                    logger.debug("Listeners notified");
+                }
+            } catch (Exception e) {
+                logger.error("Could not successfully notify listeners.", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+        if (rawPath == null || rawPath.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+        }
+
+        try {
+            setConfigFilePath(Paths.get(rawPath));
+            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+            setWatchService(initializeWatcher(configFilePath));
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+        }
+
+        this.configurationChangeNotifier = configurationChangeNotifier;
+
+        final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+        if (differentiatorName != null && !differentiatorName.isEmpty()) {
+            Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+            if (differentiatorSupplier == null) {
+                throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+                        "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+            }
+            differentiator = differentiatorSupplier.get();
+        } else {
+            differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+        }
+        differentiator.initialize(properties, configurationFileHolder);
+    }
+
+    protected void setConfigFilePath(Path configFilePath) {
+        this.configFilePath = configFilePath;
+    }
+
+    protected void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    protected void setConfigurationChangeNotifier(ConfigurationChangeNotifier configurationChangeNotifier) {
+        this.configurationChangeNotifier = configurationChangeNotifier;
+    }
+
+    protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+        this.differentiator = differentiator;
+    }
+
+    protected void setPollingPeriod(long duration, TimeUnit unit) {
+        if (duration < 0) {
+            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+        }
+        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("File Change Notifier Thread");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void close() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
new file mode 100644
index 0000000..a8e7105
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import okhttp3.Call;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+
+public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
+
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+    static {
+        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
+        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+
+        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+    }
+
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+
+    private static final String PULL_HTTP_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".pull.http";
+    public static final String PULL_HTTP_POLLING_PERIOD_KEY = PULL_HTTP_BASE_KEY + ".period.ms";
+    public static final String PORT_KEY = PULL_HTTP_BASE_KEY + ".port";
+    public static final String HOST_KEY = PULL_HTTP_BASE_KEY + ".hostname";
+    public static final String PATH_KEY = PULL_HTTP_BASE_KEY + ".path";
+    public static final String TRUSTSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".keystore.type";
+    public static final String CONNECT_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".connect.timeout.ms";
+    public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms";
+    public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
+    public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
+
+    private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
+    private final AtomicReference<Integer> portReference = new AtomicReference<>();
+    private final AtomicReference<String> hostReference = new AtomicReference<>();
+    private final AtomicReference<String> pathReference = new AtomicReference<>();
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile String connectionScheme;
+    private volatile String lastEtag = "";
+    private volatile boolean useEtag = false;
+
+    public PullHttpChangeIngestor() {
+        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+    }
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
+
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        if (pollingPeriodMS.get() < 1) {
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
+        }
+
+        final String host = properties.getProperty(HOST_KEY);
+        if (host == null || host.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
+        }
+
+        final String path = properties.getProperty(PATH_KEY, "/");
+
+        final String portString = (String) properties.get(PORT_KEY);
+        final Integer port;
+        if (portString == null) {
+            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
+        } else {
+            port = Integer.parseInt(portString);
+        }
+
+        portReference.set(port);
+        hostReference.set(host);
+        pathReference.set(path);
+
+        final String useEtagString = (String) properties.getOrDefault(USE_ETAG_KEY, "false");
+        if ("true".equalsIgnoreCase(useEtagString) || "false".equalsIgnoreCase(useEtagString)){
+            useEtag = Boolean.parseBoolean(useEtagString);
+        } else {
+            throw new IllegalArgumentException("Property, " + USE_ETAG_KEY + ", to specify whether to use the ETag header, must either be a value boolean value (\"true\" or \"false\") or left to " +
+                    "the default value of \"false\". It is set to \"" + useEtagString + "\".");
+        }
+
+        httpClientReference.set(null);
+
+        final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
+
+        // Set timeouts
+        okHttpClientBuilder.connectTimeout(Long.parseLong(properties.getProperty(CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+        okHttpClientBuilder.readTimeout(Long.parseLong(properties.getProperty(READ_TIMEOUT_KEY, DEFAULT_READ_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+
+        // Set whether to follow redirects
+        okHttpClientBuilder.followRedirects(true);
+
+        // check if the ssl path is set and add the factory if so
+        if (properties.containsKey(KEYSTORE_LOCATION_KEY)) {
+            try {
+                setSslSocketFactory(okHttpClientBuilder, properties);
+                connectionScheme = "https";
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        } else {
+            connectionScheme = "http";
+        }
+
+        httpClientReference.set(okHttpClientBuilder.build());
+        final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+        if (differentiatorName != null && !differentiatorName.isEmpty()) {
+            Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+            if (differentiatorSupplier == null) {
+                throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+                        "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+            }
+            differentiator = differentiatorSupplier.get();
+        } else {
+            differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        }
+        differentiator.initialize(properties, configurationFileHolder);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            logger.debug("Attempting to pull new config");
+            final HttpUrl url = new HttpUrl.Builder()
+                    .host(hostReference.get())
+                    .port(portReference.get())
+                    .encodedPath(pathReference.get())
+                    .scheme(connectionScheme)
+                    .build();
+
+
+            final Request.Builder requestBuilder = new Request.Builder()
+                    .get()
+                    .url(url);
+
+            if (useEtag) {
+                requestBuilder.addHeader("If-None-Match", lastEtag);
+            }
+
+            final Request request = requestBuilder.build();
+
+            final OkHttpClient httpClient = httpClientReference.get();
+
+            final Call call = httpClient.newCall(request);
+            final Response response = call.execute();
+
+            logger.debug("Response received: {}", response.toString());
+
+            if (response.code() == NOT_MODIFIED_STATUS_CODE) {
+                return;
+            }
+
+            ResponseBody body = response.body();
+            if (body == null) {
+                logger.warn("No body returned when pulling a new configuration");
+                return;
+            }
+
+            ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
+
+            if (differentiator.isNew(bodyByteBuffer)) {
+                logger.debug("New change, notifying listener");
+
+                ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
+
+                configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+                logger.debug("Listeners notified");
+            } else {
+                logger.debug("Pulled config same as currently running.");
+            }
+
+            if (useEtag) {
+                lastEtag = (new StringBuilder("\""))
+                        .append(response.header("ETag").trim())
+                        .append("\"").toString();
+            }
+        } catch (Exception e) {
+            logger.warn("Hit an exception while trying to pull", e);
+        }
+    }
+
+    private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, Properties properties) throws Exception {
+        final String keystoreLocation = properties.getProperty(KEYSTORE_LOCATION_KEY);
+        final String keystorePass = properties.getProperty(KEYSTORE_PASSWORD_KEY);
+        final String keystoreType = properties.getProperty(KEYSTORE_TYPE_KEY);
+
+        assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+        try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+            keyStore.load(keyStoreStream, keystorePass.toCharArray());
+        }
+
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+        // load truststore
+        final String truststoreLocation = properties.getProperty(TRUSTSTORE_LOCATION_KEY);
+        final String truststorePass = properties.getProperty(TRUSTSTORE_PASSWORD_KEY);
+        final String truststoreType = properties.getProperty(TRUSTSTORE_TYPE_KEY);
+        assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
+
+        KeyStore truststore = KeyStore.getInstance(truststoreType);
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+        truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+        trustManagerFactory.init(truststore);
+
+        final X509TrustManager x509TrustManager;
+        TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+        if (trustManagers[0] != null) {
+            x509TrustManager = (X509TrustManager) trustManagers[0];
+        } else {
+            throw new IllegalStateException("List of trust managers is null");
+        }
+
+        SSLContext tempSslContext;
+        try {
+            tempSslContext = SSLContext.getInstance("TLS");
+        } catch (NoSuchAlgorithmException e) {
+            logger.warn("Unable to use 'TLS' for the PullHttpChangeIngestor due to NoSuchAlgorithmException. Will attempt to use the default algorithm.", e);
+            tempSslContext = SSLContext.getDefault();
+        }
+
+        final SSLContext sslContext = tempSslContext;
+        sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+        final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+        okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+    }
+
+    private void assertKeystorePropertiesSet(String location, String password, String type) {
+        if (location == null || location.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is null or is empty");
+        }
+
+        if (password == null || password.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+        }
+
+        if (type == null || type.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+        }
+    }
+
+    private void assertTruststorePropertiesSet(String location, String password, String type) {
+        if (location == null || location.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is not set or is empty");
+        }
+
+        if (password == null || password.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+        }
+
+        if (type == null || type.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+        }
+    }
+
+    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+        this.differentiator = differentiator;
+    }
+
+    public void setLastEtag(String lastEtag) {
+        this.lastEtag = lastEtag;
+    }
+
+    public void setUseEtag(boolean useEtag) {
+        this.useEtag = useEtag;
+    }
+}