You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/11/11 17:17:57 UTC

[3/3] nifi-minifi git commit: MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier

This closes #51

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/6f3c5678
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/6f3c5678
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/6f3c5678

Branch: refs/heads/master
Commit: 6f3c567806363f58e85a2f00ef83e3b1f48f3f7f
Parents: 7954d36
Author: Joseph Percivall <JP...@apache.org>
Authored: Thu Oct 6 16:27:19 2016 -0400
Committer: Bryan Rosander <br...@apache.org>
Committed: Fri Nov 11 12:14:57 2016 -0500

----------------------------------------------------------------------
 .../src/main/assembly/dependencies.xml          |   2 +
 minifi-bootstrap/pom.xml                        |  10 +-
 .../nifi/minifi/bootstrap/BootstrapCodec.java   |   2 +-
 .../bootstrap/ConfigurationFileHolder.java      |  26 ++
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 170 ++++++----
 .../nifi/minifi/bootstrap/ShutdownHook.java     |  15 +-
 .../ConfigurationChangeCoordinator.java         | 114 +++++++
 .../ConfigurationChangeListener.java            |   4 +-
 .../ConfigurationChangeNotifier.java            |  44 +--
 .../configuration/ListenerHandleResult.java     |  14 +-
 .../WholeConfigDifferentiator.java              |  90 +++++
 .../interfaces/Differentiator.java              |  29 ++
 .../ingestors/AbstractPullChangeIngestor.java   |  60 ++++
 .../ingestors/FileChangeIngestor.java           | 234 +++++++++++++
 .../ingestors/PullHttpChangeIngestor.java       | 326 +++++++++++++++++++
 .../ingestors/RestChangeIngestor.java           | 294 +++++++++++++++++
 .../ingestors/interfaces/ChangeIngestor.java    |  32 ++
 .../notifiers/FileChangeNotifier.java           | 202 ------------
 .../notifiers/RestChangeNotifier.java           | 289 ----------------
 .../bootstrap/util/ByteBufferInputStream.java   |  48 +++
 .../bootstrap/util/ConfigTransformer.java       |   2 +-
 .../TestConfigurationChangeCoordinator.java     |  84 +++++
 .../TestWholeConfigDifferentiator.java          | 110 +++++++
 .../ingestors/TestFileChangeIngestor.java       | 171 ++++++++++
 .../ingestors/TestPullHttpChangeIngestor.java   |  65 ++++
 .../TestPullHttpChangeIngestorSSL.java          |  84 +++++
 .../ingestors/TestRestChangeIngestor.java       |  57 ++++
 .../ingestors/TestRestChangeIngestorSSL.java    | 150 +++++++++
 .../TestPullHttpChangeIngestorCommon.java       | 231 +++++++++++++
 .../common/TestRestChangeIngestorCommon.java    | 127 ++++++++
 .../notifiers/TestFileChangeNotifier.java       | 208 ------------
 .../notifiers/TestRestChangeNotifier.java       |  51 ---
 .../notifiers/TestRestChangeNotifierSSL.java    |  96 ------
 .../notifiers/util/MockChangeListener.java      |  51 ---
 .../util/TestRestChangeNotifierCommon.java      |  84 -----
 .../bootstrap/util/ConfigTransformerTest.java   |   5 +-
 .../src/main/markdown/System_Admin_Guide.md     |  97 ++++++
 .../src/main/resources/conf/bootstrap.conf      |  20 +-
 pom.xml                                         |  15 +-
 39 files changed, 2580 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-assembly/src/main/assembly/dependencies.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/src/main/assembly/dependencies.xml b/minifi-assembly/src/main/assembly/dependencies.xml
index 551c8af..a774e49 100644
--- a/minifi-assembly/src/main/assembly/dependencies.xml
+++ b/minifi-assembly/src/main/assembly/dependencies.xml
@@ -73,6 +73,8 @@
                 <include>jetty-http</include>
                 <include>jetty-io</include>
                 <include>javax.servlet-api</include>
+                <include>commons-io</include>
+                <include>okhttp</include>
             </includes>
         </dependencySet>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/pom.xml b/minifi-bootstrap/pom.xml
index 433c352..71e9b78 100644
--- a/minifi-bootstrap/pom.xml
+++ b/minifi-bootstrap/pom.xml
@@ -73,21 +73,13 @@ limitations under the License.
             <version>${jetty.version}</version>
             <scope>compile</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-prioritizers</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
-            <groupId>com.squareup.okhttp</groupId>
+            <groupId>com.squareup.okhttp3</groupId>
             <artifactId>okhttp</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
index 95e6f87..2e8a537 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
@@ -110,7 +110,7 @@ public class BootstrapCodec {
             break;
             case "SHUTDOWN": {
                 logger.debug("Received 'SHUTDOWN' command from MINIFI");
-                runner.shutdownChangeNotifiers();
+                runner.shutdownChangeNotifier();
                 runner.shutdownPeriodicStatusReporters();
                 writer.write("OK");
                 writer.newLine();

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
new file mode 100644
index 0000000..d5113e3
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface ConfigurationFileHolder {
+
+    AtomicReference<ByteBuffer> getConfigFileReference();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index ad54c61..52a803c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -57,14 +58,16 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
 import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
 import org.apache.nifi.minifi.commons.status.FlowStatusReport;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
@@ -90,7 +93,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
  * <p>
  * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException}.
  */
-public class RunMiNiFi implements QueryableStatusAggregator {
+public class RunMiNiFi implements QueryableStatusAggregator, ConfigurationFileHolder {
 
     public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
     public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
@@ -143,11 +146,18 @@ public class RunMiNiFi implements QueryableStatusAggregator {
     private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
     private volatile int gracefulShutdownSeconds;
 
-    private Set<ConfigurationChangeNotifier> changeNotifiers;
     private Set<PeriodicStatusReporter> periodicStatusReporters;
 
+    private ConfigurationChangeCoordinator changeCoordinator;
     private MiNiFiConfigurationChangeListener changeListener;
 
+    private final AtomicReference<ByteBuffer> currentConfigFileReference = new AtomicReference<>();
+
+    @Override
+    public AtomicReference<ByteBuffer> getConfigFileReference() {
+        return currentConfigFileReference;
+    }
+
     // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
     private AtomicBoolean reloading = new AtomicBoolean(false);
 
@@ -1098,8 +1108,9 @@ public class RunMiNiFi implements QueryableStatusAggregator {
 
         final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
         final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
-        try {
-            performTransformation(new FileInputStream(configFile), confDir);
+        try (InputStream inputStream = new FileInputStream(configFile)) {
+            ByteBuffer tempConfigFile = performTransformation(inputStream, confDir);
+            currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
         } catch (ConfigurationChangeException e) {
             defaultLogger.error("The config file is malformed, unable to start.", e);
             return;
@@ -1111,11 +1122,11 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             return;
         }
 
-        // Instantiate configuration listener and configured notifiers
+        // Instantiate configuration listener and configured ingestors
         this.changeListener = new MiNiFiConfigurationChangeListener(this, defaultLogger);
-        this.changeNotifiers = initializeNotifiers(this.changeListener);
         this.periodicStatusReporters = initializePeriodicNotifiers();
         startPeriodicNotifiers();
+        this.changeCoordinator = initializeNotifier(this.changeListener);
 
         ProcessBuilder builder = tuple.getKey();
         Process process = tuple.getValue();
@@ -1136,7 +1147,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                                 if (swapConfigFile.delete()) {
                                     defaultLogger.info("Swap file was successfully deleted.");
                                 } else {
-                                    defaultLogger.info("Swap file was not deleted.");
+                                    defaultLogger.error("Swap file was not deleted. It should be deleted manually.");
                                 }
                             }
 
@@ -1180,7 +1191,8 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                                 defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
 
                                 try {
-                                    performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                    ByteBuffer tempConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                    currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
                                 } catch (ConfigurationChangeException e) {
                                     defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
                                     return;
@@ -1228,7 +1240,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                 }
             }
         } finally {
-            shutdownChangeNotifiers();
+            shutdownChangeNotifier();
             shutdownPeriodicStatusReporters();
         }
     }
@@ -1424,41 +1436,26 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         }
     }
 
-    public void shutdownChangeNotifiers() {
-        for (ConfigurationChangeNotifier notifier : getChangeNotifiers()) {
-            try {
-                notifier.close();
-            } catch (IOException e) {
-                defaultLogger.warn("Could not successfully stop notifier {}", notifier.getClass(), e);
-            }
+    public void shutdownChangeNotifier() {
+        try {
+            getChangeCoordinator().close();
+        } catch (IOException e) {
+            defaultLogger.warn("Could not successfully stop notifier ", e);
         }
     }
 
-    public Set<ConfigurationChangeNotifier> getChangeNotifiers() {
-        return Collections.unmodifiableSet(changeNotifiers);
+    public ConfigurationChangeCoordinator getChangeCoordinator() {
+        return changeCoordinator;
     }
 
-    private Set<ConfigurationChangeNotifier> initializeNotifiers(ConfigurationChangeListener configChangeListener) throws IOException {
-        final Set<ConfigurationChangeNotifier> changeNotifiers = new HashSet<>();
-
+    private ConfigurationChangeCoordinator initializeNotifier(ConfigurationChangeListener configChangeListener) throws IOException {
         final Properties bootstrapProperties = getBootstrapProperties();
 
-        final String notifiersCsv = bootstrapProperties.getProperty(NOTIFIER_COMPONENTS_KEY);
-        if (notifiersCsv != null && !notifiersCsv.isEmpty()) {
-            for (String notifierClassname : Arrays.asList(notifiersCsv.split(","))) {
-                try {
-                    Class<?> notifierClass = Class.forName(notifierClassname);
-                    ConfigurationChangeNotifier notifier = (ConfigurationChangeNotifier) notifierClass.newInstance();
-                    notifier.initialize(bootstrapProperties);
-                    changeNotifiers.add(notifier);
-                    notifier.registerListener(configChangeListener);
-                    notifier.start();
-                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
-                    throw new RuntimeException("Issue instantiating notifier " + notifierClassname, e);
-                }
-            }
-        }
-        return changeNotifiers;
+        ConfigurationChangeCoordinator notifier = new ConfigurationChangeCoordinator();
+        notifier.initialize(bootstrapProperties, this, Collections.singleton(configChangeListener));
+        notifier.start();
+
+        return notifier;
     }
 
     public Set<PeriodicStatusReporter> getPeriodicStatusReporters() {
@@ -1506,6 +1503,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
 
         private final RunMiNiFi runner;
         private final Logger logger;
+        private static final ReentrantLock handlingLock = new ReentrantLock();
 
         public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger) {
             this.runner = runner;
@@ -1515,7 +1513,12 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         @Override
         public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
             logger.info("Received notification of a change");
+
+            if (!handlingLock.tryLock()) {
+                throw new ConfigurationChangeException("Instance is already handling another change");
+            }
             try {
+
                 final Properties bootstrapProperties = runner.getBootstrapProperties();
                 final File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
 
@@ -1528,38 +1531,50 @@ public class RunMiNiFi implements QueryableStatusAggregator {
                 }
 
                 // Create an input stream to use for writing a config file as well as feeding to the config transformer
-                final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
-                newConfigBais.mark(-1);
+                try (final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray())) {
+                    newConfigBais.mark(-1);
 
-                final File swapConfigFile = runner.getSwapFile(logger);
-                logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
-                Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
+                    final File swapConfigFile = runner.getSwapFile(logger);
+                    logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
 
-                try {
-                    logger.info("Persisting changes to {}", configFile.getAbsolutePath());
-                    saveFile(newConfigBais, configFile);
-
-                     try {
-                         // Reset the input stream to provide to the transformer
-                         newConfigBais.reset();
-
-                         final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
-                         logger.info("Performing transformation for input and saving outputs to {}", confDir);
-                         performTransformation(newConfigBais, confDir);
-
-                         logger.info("Reloading instance with new configuration");
-                         restartInstance();
-                     } catch (Exception e){
-                         logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
-                         Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
-                         throw e;
-                     }
-                } catch (Exception e){
-                    logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
-                    if(!swapConfigFile.delete()){
-                        logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                    try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
+                        Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
+                    }
+
+                    try {
+                        logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+                        saveFile(newConfigBais, configFile);
+                        final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+                        try {
+                            // Reset the input stream to provide to the transformer
+                            newConfigBais.reset();
+
+                            logger.info("Performing transformation for input and saving outputs to {}", confDir);
+                            ByteBuffer tempConfigFile = performTransformation(newConfigBais, confDir);
+                            runner.currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
+
+                            try {
+                                logger.info("Reloading instance with new configuration");
+                                restartInstance();
+                            } catch (Exception e) {
+                                logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting.");
+                                ByteBuffer resetConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+                                runner.currentConfigFileReference.set(resetConfigFile.asReadOnlyBuffer());
+                                throw e;
+                            }
+                        } catch (Exception e) {
+                            logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+                            Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+                            throw e;
+                        }
+                    } catch (Exception e) {
+                        logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+                        if (!swapConfigFile.delete()) {
+                            logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+                        }
+                        throw e;
                     }
-                    throw e;
                 }
             } catch (ConfigurationChangeException e){
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
@@ -1567,6 +1582,15 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             } catch (IOException ioe) {
                 logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
                 throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
+            } finally {
+                try {
+                    if (configInputStream != null) {
+                        configInputStream.close() ;
+                    }
+                } catch (IOException e) {
+                    // Quietly close
+                }
+                handlingLock.unlock();
             }
         }
 
@@ -1589,8 +1613,6 @@ public class RunMiNiFi implements QueryableStatusAggregator {
             }
         }
 
-
-
         private void restartInstance() throws IOException {
             try {
                 runner.reload();
@@ -1600,9 +1622,13 @@ public class RunMiNiFi implements QueryableStatusAggregator {
         }
     }
 
-    private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
-        try {
-            ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+    private static ByteBuffer performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+                TeeInputStream teeInputStream = new TeeInputStream(configIs, byteArrayOutputStream)) {
+
+            ConfigTransformer.transformConfigFile(teeInputStream, configDestinationPath);
+
+            return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
         } catch (ConfigurationChangeException e){
             throw e;
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index bec39e6..236a52d 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -25,8 +25,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
 import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
 
 public class ShutdownHook extends Thread {
 
@@ -53,13 +53,12 @@ public class ShutdownHook extends Thread {
     public void run() {
         executor.shutdown();
 
-        System.out.println("Initiating shutdown of bootstrap change notifiers...");
-        for (ConfigurationChangeNotifier notifier : runner.getChangeNotifiers()) {
-            try {
-                notifier.close();
-            } catch (IOException ioe) {
-                System.out.println("Could not successfully stop notifier " + notifier.getClass() + " due to " + ioe);
-            }
+        System.out.println("Initiating shutdown of bootstrap change ingestors...");
+        ConfigurationChangeCoordinator notifier = runner.getChangeCoordinator();
+        try {
+            notifier.close();
+        } catch (IOException ioe) {
+            System.out.println("Could not successfully stop notifier due to " + ioe);
         }
 
         System.out.println("Initiating shutdown of bootstrap periodic status reporters...");

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
new file mode 100644
index 0000000..3fa5b8f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
+
+    public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
+    public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
+    private final static Logger logger = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+
+    /**
+     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
+     *
+     * @param properties from the bootstrap configuration
+     */
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, Collection<ConfigurationChangeListener> changeListenerSet) {
+        final String ingestorsCsv = properties.getProperty(NOTIFIER_INGESTORS_KEY);
+
+        if (ingestorsCsv != null && !ingestorsCsv.isEmpty()) {
+            for (String ingestorClassname : Arrays.asList(ingestorsCsv.split(","))) {
+                ingestorClassname = ingestorClassname.trim();
+                try {
+                    Class<?> ingestorClass = Class.forName(ingestorClassname);
+                    ChangeIngestor changeIngestor = (ChangeIngestor) ingestorClass.newInstance();
+                    changeIngestor.initialize(properties, configurationFileHolder, this);
+                    changeIngestors.add(changeIngestor);
+                    logger.info("Initialized ");
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new RuntimeException("Issue instantiating ingestor " + ingestorClassname, e);
+                }
+            }
+        }
+        configurationChangeListeners.clear();
+        configurationChangeListeners.addAll(changeListenerSet);
+    }
+
+    /**
+     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
+     */
+    public void start() {
+        changeIngestors.forEach(ChangeIngestor::start);
+    }
+
+    /**
+     * Provides an immutable collection of listeners for the notifier instance
+     *
+     * @return a collection of those listeners registered for notifications
+     */
+    public Set<ConfigurationChangeListener> getChangeListeners() {
+        return Collections.unmodifiableSet(configurationChangeListeners);
+    }
+
+    /**
+     * Provide the mechanism by which listeners are notified
+     */
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) {
+        logger.info("Notifying Listeners of a change");
+
+        Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+        for (final ConfigurationChangeListener listener : getChangeListeners()) {
+            ListenerHandleResult result;
+            try {
+                listener.handleChange(new ByteBufferInputStream(newConfig.duplicate()));
+                result = new ListenerHandleResult(listener);
+            } catch (ConfigurationChangeException ex) {
+                result = new ListenerHandleResult(listener, ex);
+            }
+            listenerHandleResults.add(result);
+            logger.info("Listener notification result:" + result.toString());
+        }
+        return listenerHandleResults;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        for (ChangeIngestor changeIngestor : changeIngestors) {
+            changeIngestor.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 756b051..642ed4b 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -19,8 +19,8 @@ package org.apache.nifi.minifi.bootstrap.configuration;
 import java.io.InputStream;
 
 /**
- * Interface for handling events detected and driven by an associated {@link ConfigurationChangeNotifier} to which the listener
- * has registered via {@link ConfigurationChangeNotifier#registerListener(ConfigurationChangeListener)}.
+ * Interface for handling events detected and driven by an associated {@link ConfigurationChangeCoordinator} to which the listener
+ * has registered via {@link ConfigurationChangeCoordinator#registerListener(ConfigurationChangeListener)}.
  */
 public interface ConfigurationChangeListener {
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 745ce6c..2ebced5 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -1,57 +1,29 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.minifi.bootstrap.configuration;
 
-import java.io.Closeable;
+import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Properties;
 import java.util.Set;
 
-public interface ConfigurationChangeNotifier extends Closeable {
-
-    /**
-     * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
-     *
-     * @param properties from the bootstrap configuration
-     */
-    void initialize(Properties properties);
+public interface ConfigurationChangeNotifier {
 
-    /**
-     * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
-     */
-    void start();
-
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
     Set<ConfigurationChangeListener> getChangeListeners();
 
-    /**
-     * Adds a listener to be notified of configuration changes
-     *
-     * @param listener to be added to the collection
-     * @return true if the listener was added; false if already registered
-     */
-    boolean registerListener(ConfigurationChangeListener listener);
-
-    /**
-     * Provide the mechanism by which listeners are notified
-     */
-    Collection<ListenerHandleResult> notifyListeners();
+    Collection<ListenerHandleResult> notifyListeners(ByteBuffer is);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
index 8ac4cea..c0a7e74 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -22,34 +22,34 @@ public class ListenerHandleResult {
     private final ConfigurationChangeListener configurationChangeListener;
     private final Exception failureCause;
 
-    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener) {
         this.configurationChangeListener = configurationChangeListener;
         failureCause = null;
     }
 
-    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+    public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause) {
         this.configurationChangeListener = configurationChangeListener;
         this.failureCause = failureCause;
     }
 
-    public boolean succeeded(){
+    public boolean succeeded() {
         return failureCause == null;
     }
 
-    public String getDescriptor(){
+    public String getDescriptor() {
         return configurationChangeListener.getDescriptor();
     }
 
-    public Exception getFailureCause(){
+    public Exception getFailureCause() {
         return failureCause;
     }
 
     @Override
     public String toString() {
-        if(failureCause == null){
+        if (failureCause == null) {
             return getDescriptor() + " successfully handled the configuration change";
         } else {
-            return getDescriptor() + " FAILED to handle the configuration change due to: '"  + failureCause.getMessage() + "'";
+            return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
new file mode 100644
index 0000000..565a8f4
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class WholeConfigDifferentiator {
+
+
+    private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
+
+    public static final String WHOLE_CONFIG_KEY = "Whole Config";
+
+    volatile ConfigurationFileHolder configurationFileHolder;
+
+    boolean compareInputStreamToConfigFile(InputStream inputStream) throws IOException {
+        logger.debug("Checking if change is different");
+        AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+        ByteBuffer currentConfigFile = currentConfigFileReference.get();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
+        DataInputStream dataInputStream = new DataInputStream(inputStream);
+        try {
+            dataInputStream.readFully(byteBuffer.array());
+        } catch (EOFException e) {
+            logger.debug("New config is shorter than the current. Must be different.");
+            return true;
+        }
+        logger.debug("Read the input");
+
+        if (dataInputStream.available() != 0) {
+            return true;
+        } else {
+            return byteBuffer.compareTo(currentConfigFile) != 0;
+        }
+    }
+
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) {
+        this.configurationFileHolder = configurationFileHolder;
+    }
+
+
+    public static class InputStreamInput extends WholeConfigDifferentiator implements Differentiator<InputStream> {
+        public boolean isNew(InputStream inputStream) throws IOException {
+            return compareInputStreamToConfigFile(inputStream);
+        }
+    }
+
+    public static class ByteBufferInput extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+        public boolean isNew(ByteBuffer inputBuffer) {
+            AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+            ByteBuffer currentConfigFile = currentConfigFileReference.get();
+            return inputBuffer.compareTo(currentConfigFile) != 0;
+        }
+    }
+
+
+    public static Differentiator<InputStream> getInputStreamDifferentiator() {
+        return new InputStreamInput();
+    }
+
+    public static Differentiator<ByteBuffer> getByteBufferDifferentiator() {
+        return new ByteBufferInput();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
new file mode 100644
index 0000000..5beb78b
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public interface Differentiator <T> {
+    void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+
+    boolean isNew(T input) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
new file mode 100644
index 0000000..1678f20
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+
+public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {
+
+
+    // 5 minute default pulling period
+    protected static final String DEFAULT_POLLING_PERIOD = "300000";
+    protected static Logger logger;
+
+    protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
+    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+    protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        this.configurationChangeNotifier = configurationChangeNotifier;
+    }
+
+    @Override
+    public void start() {
+        scheduledThreadPoolExecutor.scheduleAtFixedRate(this, pollingPeriodMS.get(), pollingPeriodMS.get(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws IOException {
+        scheduledThreadPoolExecutor.shutdownNow();
+    }
+
+    public abstract void run();
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
new file mode 100644
index 0000000..39b272d
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.commons.io.input.TeeInputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+/**
+ * FileChangeIngestor provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}.  Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed.  The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeIngestor implements Runnable, ChangeIngestor {
+
+    private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+    static {
+        HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
+        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+
+        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+    }
+
+
+    protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+    protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+    private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
+    private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+
+    protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+    protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+    public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+
+    private Path configFilePath;
+    private WatchService watchService;
+    private long pollingSeconds;
+    private volatile Differentiator<InputStream> differentiator;
+
+    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+    private ScheduledExecutorService executorService;
+
+    protected static WatchService initializeWatcher(Path filePath) {
+        try {
+            final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+            final Path watchDirectory = filePath.getParent();
+            watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+            return fsWatcher;
+        } catch (IOException ioe) {
+            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+        }
+    }
+
+    protected boolean targetChanged() {
+        boolean targetChanged = false;
+
+        final WatchKey watchKey = this.watchService.poll();
+
+        if (watchKey == null) {
+            return targetChanged;
+        }
+
+        for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+            final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+            final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+            final Path changedFile = pathEvent.context();
+
+            // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+            targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFilePath.getName(configFilePath.getNameCount() - 1)));
+        }
+
+        // After completing inspection, reset for detection of subsequent change events
+        boolean valid = watchKey.reset();
+        if (!valid) {
+            throw new IllegalStateException("Unable to reinitialize file system watcher.");
+        }
+
+        return targetChanged;
+    }
+
+    @Override
+    public void run() {
+        logger.debug("Checking for a change");
+        if (targetChanged()) {
+            logger.debug("Target changed, checking if it's different than current flow.");
+            try (FileInputStream configFile = new FileInputStream(configFilePath.toFile());
+                ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
+                TeeInputStream teeInputStream = new TeeInputStream(configFile, pipedOutputStream)) {
+
+                if (differentiator.isNew(teeInputStream)) {
+                    logger.debug("New change, notifying listener");
+                    // Fill the byteArrayOutputStream with the rest of the request data
+                    while (teeInputStream.available() != 0) {
+                        teeInputStream.read();
+                    }
+
+                    ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
+                    ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
+
+                    configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+                    logger.debug("Listeners notified");
+                }
+            } catch (Exception e) {
+                logger.error("Could not successfully notify listeners.", e);
+            }
+        }
+    }
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+        final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+        if (rawPath == null || rawPath.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+        }
+
+        try {
+            setConfigFilePath(Paths.get(rawPath));
+            setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+            setWatchService(initializeWatcher(configFilePath));
+        } catch (Exception e) {
+            throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+        }
+
+        this.configurationChangeNotifier = configurationChangeNotifier;
+
+        final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+        if (differentiatorName != null && !differentiatorName.isEmpty()) {
+            Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+            if (differentiatorSupplier == null) {
+                throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+                        "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+            }
+            differentiator = differentiatorSupplier.get();
+        } else {
+            differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+        }
+        differentiator.initialize(properties, configurationFileHolder);
+    }
+
+    protected void setConfigFilePath(Path configFilePath) {
+        this.configFilePath = configFilePath;
+    }
+
+    protected void setWatchService(WatchService watchService) {
+        this.watchService = watchService;
+    }
+
+    protected void setConfigurationChangeNotifier(ConfigurationChangeNotifier configurationChangeNotifier) {
+        this.configurationChangeNotifier = configurationChangeNotifier;
+    }
+
+    protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+        this.differentiator = differentiator;
+    }
+
+    protected void setPollingPeriod(long duration, TimeUnit unit) {
+        if (duration < 0) {
+            throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+        }
+        this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public void start() {
+        executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setName("File Change Notifier Thread");
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+    }
+
+    @Override
+    public void close() {
+        if (this.executorService != null) {
+            this.executorService.shutdownNow();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
new file mode 100644
index 0000000..a8e7105
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import okhttp3.Call;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+
+public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
+
+    private static final int NOT_MODIFIED_STATUS_CODE = 304;
+    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+    static {
+        HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
+        tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+
+        DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+    }
+
+    private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+    private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+
+    private static final String PULL_HTTP_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".pull.http";
+    public static final String PULL_HTTP_POLLING_PERIOD_KEY = PULL_HTTP_BASE_KEY + ".period.ms";
+    public static final String PORT_KEY = PULL_HTTP_BASE_KEY + ".port";
+    public static final String HOST_KEY = PULL_HTTP_BASE_KEY + ".hostname";
+    public static final String PATH_KEY = PULL_HTTP_BASE_KEY + ".path";
+    public static final String TRUSTSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".keystore.type";
+    public static final String CONNECT_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".connect.timeout.ms";
+    public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms";
+    public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
+    public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
+
+    private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
+    private final AtomicReference<Integer> portReference = new AtomicReference<>();
+    private final AtomicReference<String> hostReference = new AtomicReference<>();
+    private final AtomicReference<String> pathReference = new AtomicReference<>();
+    private volatile Differentiator<ByteBuffer> differentiator;
+    private volatile String connectionScheme;
+    private volatile String lastEtag = "";
+    private volatile boolean useEtag = false;
+
+    public PullHttpChangeIngestor() {
+        logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+    }
+
+    @Override
+    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+        super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
+
+        pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+        if (pollingPeriodMS.get() < 1) {
+            throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
+        }
+
+        final String host = properties.getProperty(HOST_KEY);
+        if (host == null || host.isEmpty()) {
+            throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
+        }
+
+        final String path = properties.getProperty(PATH_KEY, "/");
+
+        final String portString = (String) properties.get(PORT_KEY);
+        final Integer port;
+        if (portString == null) {
+            throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
+        } else {
+            port = Integer.parseInt(portString);
+        }
+
+        portReference.set(port);
+        hostReference.set(host);
+        pathReference.set(path);
+
+        final String useEtagString = (String) properties.getOrDefault(USE_ETAG_KEY, "false");
+        if ("true".equalsIgnoreCase(useEtagString) || "false".equalsIgnoreCase(useEtagString)){
+            useEtag = Boolean.parseBoolean(useEtagString);
+        } else {
+            throw new IllegalArgumentException("Property, " + USE_ETAG_KEY + ", to specify whether to use the ETag header, must either be a value boolean value (\"true\" or \"false\") or left to " +
+                    "the default value of \"false\". It is set to \"" + useEtagString + "\".");
+        }
+
+        httpClientReference.set(null);
+
+        final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
+
+        // Set timeouts
+        okHttpClientBuilder.connectTimeout(Long.parseLong(properties.getProperty(CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+        okHttpClientBuilder.readTimeout(Long.parseLong(properties.getProperty(READ_TIMEOUT_KEY, DEFAULT_READ_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+
+        // Set whether to follow redirects
+        okHttpClientBuilder.followRedirects(true);
+
+        // check if the ssl path is set and add the factory if so
+        if (properties.containsKey(KEYSTORE_LOCATION_KEY)) {
+            try {
+                setSslSocketFactory(okHttpClientBuilder, properties);
+                connectionScheme = "https";
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        } else {
+            connectionScheme = "http";
+        }
+
+        httpClientReference.set(okHttpClientBuilder.build());
+        final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+        if (differentiatorName != null && !differentiatorName.isEmpty()) {
+            Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+            if (differentiatorSupplier == null) {
+                throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+                        "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+            }
+            differentiator = differentiatorSupplier.get();
+        } else {
+            differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        }
+        differentiator.initialize(properties, configurationFileHolder);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            logger.debug("Attempting to pull new config");
+            final HttpUrl url = new HttpUrl.Builder()
+                    .host(hostReference.get())
+                    .port(portReference.get())
+                    .encodedPath(pathReference.get())
+                    .scheme(connectionScheme)
+                    .build();
+
+
+            final Request.Builder requestBuilder = new Request.Builder()
+                    .get()
+                    .url(url);
+
+            if (useEtag) {
+                requestBuilder.addHeader("If-None-Match", lastEtag);
+            }
+
+            final Request request = requestBuilder.build();
+
+            final OkHttpClient httpClient = httpClientReference.get();
+
+            final Call call = httpClient.newCall(request);
+            final Response response = call.execute();
+
+            logger.debug("Response received: {}", response.toString());
+
+            if (response.code() == NOT_MODIFIED_STATUS_CODE) {
+                return;
+            }
+
+            ResponseBody body = response.body();
+            if (body == null) {
+                logger.warn("No body returned when pulling a new configuration");
+                return;
+            }
+
+            ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
+
+            if (differentiator.isNew(bodyByteBuffer)) {
+                logger.debug("New change, notifying listener");
+
+                ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
+
+                configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+                logger.debug("Listeners notified");
+            } else {
+                logger.debug("Pulled config same as currently running.");
+            }
+
+            if (useEtag) {
+                lastEtag = (new StringBuilder("\""))
+                        .append(response.header("ETag").trim())
+                        .append("\"").toString();
+            }
+        } catch (Exception e) {
+            logger.warn("Hit an exception while trying to pull", e);
+        }
+    }
+
+    private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, Properties properties) throws Exception {
+        final String keystoreLocation = properties.getProperty(KEYSTORE_LOCATION_KEY);
+        final String keystorePass = properties.getProperty(KEYSTORE_PASSWORD_KEY);
+        final String keystoreType = properties.getProperty(KEYSTORE_TYPE_KEY);
+
+        assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+        try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+            keyStore.load(keyStoreStream, keystorePass.toCharArray());
+        }
+
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+        // load truststore
+        final String truststoreLocation = properties.getProperty(TRUSTSTORE_LOCATION_KEY);
+        final String truststorePass = properties.getProperty(TRUSTSTORE_PASSWORD_KEY);
+        final String truststoreType = properties.getProperty(TRUSTSTORE_TYPE_KEY);
+        assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
+
+        KeyStore truststore = KeyStore.getInstance(truststoreType);
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+        truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+        trustManagerFactory.init(truststore);
+
+        final X509TrustManager x509TrustManager;
+        TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+        if (trustManagers[0] != null) {
+            x509TrustManager = (X509TrustManager) trustManagers[0];
+        } else {
+            throw new IllegalStateException("List of trust managers is null");
+        }
+
+        SSLContext tempSslContext;
+        try {
+            tempSslContext = SSLContext.getInstance("TLS");
+        } catch (NoSuchAlgorithmException e) {
+            logger.warn("Unable to use 'TLS' for the PullHttpChangeIngestor due to NoSuchAlgorithmException. Will attempt to use the default algorithm.", e);
+            tempSslContext = SSLContext.getDefault();
+        }
+
+        final SSLContext sslContext = tempSslContext;
+        sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+        final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+        okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+    }
+
+    private void assertKeystorePropertiesSet(String location, String password, String type) {
+        if (location == null || location.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is null or is empty");
+        }
+
+        if (password == null || password.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+        }
+
+        if (type == null || type.isEmpty()) {
+            throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+        }
+    }
+
+    private void assertTruststorePropertiesSet(String location, String password, String type) {
+        if (location == null || location.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is not set or is empty");
+        }
+
+        if (password == null || password.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+        }
+
+        if (type == null || type.isEmpty()) {
+            throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+        }
+    }
+
+    protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+        this.differentiator = differentiator;
+    }
+
+    public void setLastEtag(String lastEtag) {
+        this.lastEtag = lastEtag;
+    }
+
+    public void setUseEtag(boolean useEtag) {
+        this.useEtag = useEtag;
+    }
+}