You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/11/11 17:17:57 UTC
[3/3] nifi-minifi git commit: MINIFI-36 Refactoring config change
notifiers and adding Pull config change notifier
MINIFI-36 Refactoring config change notifiers and adding Pull config change notifier
This closes #51
Signed-off-by: Bryan Rosander <br...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/6f3c5678
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/6f3c5678
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/6f3c5678
Branch: refs/heads/master
Commit: 6f3c567806363f58e85a2f00ef83e3b1f48f3f7f
Parents: 7954d36
Author: Joseph Percivall <JP...@apache.org>
Authored: Thu Oct 6 16:27:19 2016 -0400
Committer: Bryan Rosander <br...@apache.org>
Committed: Fri Nov 11 12:14:57 2016 -0500
----------------------------------------------------------------------
.../src/main/assembly/dependencies.xml | 2 +
minifi-bootstrap/pom.xml | 10 +-
.../nifi/minifi/bootstrap/BootstrapCodec.java | 2 +-
.../bootstrap/ConfigurationFileHolder.java | 26 ++
.../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 170 ++++++----
.../nifi/minifi/bootstrap/ShutdownHook.java | 15 +-
.../ConfigurationChangeCoordinator.java | 114 +++++++
.../ConfigurationChangeListener.java | 4 +-
.../ConfigurationChangeNotifier.java | 44 +--
.../configuration/ListenerHandleResult.java | 14 +-
.../WholeConfigDifferentiator.java | 90 +++++
.../interfaces/Differentiator.java | 29 ++
.../ingestors/AbstractPullChangeIngestor.java | 60 ++++
.../ingestors/FileChangeIngestor.java | 234 +++++++++++++
.../ingestors/PullHttpChangeIngestor.java | 326 +++++++++++++++++++
.../ingestors/RestChangeIngestor.java | 294 +++++++++++++++++
.../ingestors/interfaces/ChangeIngestor.java | 32 ++
.../notifiers/FileChangeNotifier.java | 202 ------------
.../notifiers/RestChangeNotifier.java | 289 ----------------
.../bootstrap/util/ByteBufferInputStream.java | 48 +++
.../bootstrap/util/ConfigTransformer.java | 2 +-
.../TestConfigurationChangeCoordinator.java | 84 +++++
.../TestWholeConfigDifferentiator.java | 110 +++++++
.../ingestors/TestFileChangeIngestor.java | 171 ++++++++++
.../ingestors/TestPullHttpChangeIngestor.java | 65 ++++
.../TestPullHttpChangeIngestorSSL.java | 84 +++++
.../ingestors/TestRestChangeIngestor.java | 57 ++++
.../ingestors/TestRestChangeIngestorSSL.java | 150 +++++++++
.../TestPullHttpChangeIngestorCommon.java | 231 +++++++++++++
.../common/TestRestChangeIngestorCommon.java | 127 ++++++++
.../notifiers/TestFileChangeNotifier.java | 208 ------------
.../notifiers/TestRestChangeNotifier.java | 51 ---
.../notifiers/TestRestChangeNotifierSSL.java | 96 ------
.../notifiers/util/MockChangeListener.java | 51 ---
.../util/TestRestChangeNotifierCommon.java | 84 -----
.../bootstrap/util/ConfigTransformerTest.java | 5 +-
.../src/main/markdown/System_Admin_Guide.md | 97 ++++++
.../src/main/resources/conf/bootstrap.conf | 20 +-
pom.xml | 15 +-
39 files changed, 2580 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-assembly/src/main/assembly/dependencies.xml
----------------------------------------------------------------------
diff --git a/minifi-assembly/src/main/assembly/dependencies.xml b/minifi-assembly/src/main/assembly/dependencies.xml
index 551c8af..a774e49 100644
--- a/minifi-assembly/src/main/assembly/dependencies.xml
+++ b/minifi-assembly/src/main/assembly/dependencies.xml
@@ -73,6 +73,8 @@
<include>jetty-http</include>
<include>jetty-io</include>
<include>javax.servlet-api</include>
+ <include>commons-io</include>
+ <include>okhttp</include>
</includes>
</dependencySet>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/pom.xml b/minifi-bootstrap/pom.xml
index 433c352..71e9b78 100644
--- a/minifi-bootstrap/pom.xml
+++ b/minifi-bootstrap/pom.xml
@@ -73,21 +73,13 @@ limitations under the License.
<version>${jetty.version}</version>
<scope>compile</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-prioritizers</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
- <groupId>com.squareup.okhttp</groupId>
+ <groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
- <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
index 95e6f87..2e8a537 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapCodec.java
@@ -110,7 +110,7 @@ public class BootstrapCodec {
break;
case "SHUTDOWN": {
logger.debug("Received 'SHUTDOWN' command from MINIFI");
- runner.shutdownChangeNotifiers();
+ runner.shutdownChangeNotifier();
runner.shutdownPeriodicStatusReporters();
writer.write("OK");
writer.newLine();
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
new file mode 100644
index 0000000..d5113e3
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ConfigurationFileHolder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface ConfigurationFileHolder {
+
+ AtomicReference<ByteBuffer> getConfigFileReference();
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index ad54c61..52a803c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -35,6 +35,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -57,14 +58,16 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.stream.io.ByteArrayInputStream;
@@ -90,7 +93,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
* <p>
* If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException}.
*/
-public class RunMiNiFi implements QueryableStatusAggregator {
+public class RunMiNiFi implements QueryableStatusAggregator, ConfigurationFileHolder {
public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
@@ -143,11 +146,18 @@ public class RunMiNiFi implements QueryableStatusAggregator {
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
private volatile int gracefulShutdownSeconds;
- private Set<ConfigurationChangeNotifier> changeNotifiers;
private Set<PeriodicStatusReporter> periodicStatusReporters;
+ private ConfigurationChangeCoordinator changeCoordinator;
private MiNiFiConfigurationChangeListener changeListener;
+ private final AtomicReference<ByteBuffer> currentConfigFileReference = new AtomicReference<>();
+
+ @Override
+ public AtomicReference<ByteBuffer> getConfigFileReference() {
+ return currentConfigFileReference;
+ }
+
// Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
private AtomicBoolean reloading = new AtomicBoolean(false);
@@ -1098,8 +1108,9 @@ public class RunMiNiFi implements QueryableStatusAggregator {
final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY);
final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY));
- try {
- performTransformation(new FileInputStream(configFile), confDir);
+ try (InputStream inputStream = new FileInputStream(configFile)) {
+ ByteBuffer tempConfigFile = performTransformation(inputStream, confDir);
+ currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
} catch (ConfigurationChangeException e) {
defaultLogger.error("The config file is malformed, unable to start.", e);
return;
@@ -1111,11 +1122,11 @@ public class RunMiNiFi implements QueryableStatusAggregator {
return;
}
- // Instantiate configuration listener and configured notifiers
+ // Instantiate configuration listener and configured ingestors
this.changeListener = new MiNiFiConfigurationChangeListener(this, defaultLogger);
- this.changeNotifiers = initializeNotifiers(this.changeListener);
this.periodicStatusReporters = initializePeriodicNotifiers();
startPeriodicNotifiers();
+ this.changeCoordinator = initializeNotifier(this.changeListener);
ProcessBuilder builder = tuple.getKey();
Process process = tuple.getValue();
@@ -1136,7 +1147,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
if (swapConfigFile.delete()) {
defaultLogger.info("Swap file was successfully deleted.");
} else {
- defaultLogger.info("Swap file was not deleted.");
+ defaultLogger.error("Swap file was not deleted. It should be deleted manually.");
}
}
@@ -1180,7 +1191,8 @@ public class RunMiNiFi implements QueryableStatusAggregator {
defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
try {
- performTransformation(new FileInputStream(swapConfigFile), confDir);
+ ByteBuffer tempConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+ currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
} catch (ConfigurationChangeException e) {
defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
return;
@@ -1228,7 +1240,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
}
}
} finally {
- shutdownChangeNotifiers();
+ shutdownChangeNotifier();
shutdownPeriodicStatusReporters();
}
}
@@ -1424,41 +1436,26 @@ public class RunMiNiFi implements QueryableStatusAggregator {
}
}
- public void shutdownChangeNotifiers() {
- for (ConfigurationChangeNotifier notifier : getChangeNotifiers()) {
- try {
- notifier.close();
- } catch (IOException e) {
- defaultLogger.warn("Could not successfully stop notifier {}", notifier.getClass(), e);
- }
+ public void shutdownChangeNotifier() {
+ try {
+ getChangeCoordinator().close();
+ } catch (IOException e) {
+ defaultLogger.warn("Could not successfully stop notifier ", e);
}
}
- public Set<ConfigurationChangeNotifier> getChangeNotifiers() {
- return Collections.unmodifiableSet(changeNotifiers);
+ public ConfigurationChangeCoordinator getChangeCoordinator() {
+ return changeCoordinator;
}
- private Set<ConfigurationChangeNotifier> initializeNotifiers(ConfigurationChangeListener configChangeListener) throws IOException {
- final Set<ConfigurationChangeNotifier> changeNotifiers = new HashSet<>();
-
+ private ConfigurationChangeCoordinator initializeNotifier(ConfigurationChangeListener configChangeListener) throws IOException {
final Properties bootstrapProperties = getBootstrapProperties();
- final String notifiersCsv = bootstrapProperties.getProperty(NOTIFIER_COMPONENTS_KEY);
- if (notifiersCsv != null && !notifiersCsv.isEmpty()) {
- for (String notifierClassname : Arrays.asList(notifiersCsv.split(","))) {
- try {
- Class<?> notifierClass = Class.forName(notifierClassname);
- ConfigurationChangeNotifier notifier = (ConfigurationChangeNotifier) notifierClass.newInstance();
- notifier.initialize(bootstrapProperties);
- changeNotifiers.add(notifier);
- notifier.registerListener(configChangeListener);
- notifier.start();
- } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
- throw new RuntimeException("Issue instantiating notifier " + notifierClassname, e);
- }
- }
- }
- return changeNotifiers;
+ ConfigurationChangeCoordinator notifier = new ConfigurationChangeCoordinator();
+ notifier.initialize(bootstrapProperties, this, Collections.singleton(configChangeListener));
+ notifier.start();
+
+ return notifier;
}
public Set<PeriodicStatusReporter> getPeriodicStatusReporters() {
@@ -1506,6 +1503,7 @@ public class RunMiNiFi implements QueryableStatusAggregator {
private final RunMiNiFi runner;
private final Logger logger;
+ private static final ReentrantLock handlingLock = new ReentrantLock();
public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger) {
this.runner = runner;
@@ -1515,7 +1513,12 @@ public class RunMiNiFi implements QueryableStatusAggregator {
@Override
public void handleChange(InputStream configInputStream) throws ConfigurationChangeException {
logger.info("Received notification of a change");
+
+ if (!handlingLock.tryLock()) {
+ throw new ConfigurationChangeException("Instance is already handling another change");
+ }
try {
+
final Properties bootstrapProperties = runner.getBootstrapProperties();
final File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
@@ -1528,38 +1531,50 @@ public class RunMiNiFi implements QueryableStatusAggregator {
}
// Create an input stream to use for writing a config file as well as feeding to the config transformer
- final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray());
- newConfigBais.mark(-1);
+ try (final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray())) {
+ newConfigBais.mark(-1);
- final File swapConfigFile = runner.getSwapFile(logger);
- logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
- Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING);
+ final File swapConfigFile = runner.getSwapFile(logger);
+ logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
- try {
- logger.info("Persisting changes to {}", configFile.getAbsolutePath());
- saveFile(newConfigBais, configFile);
-
- try {
- // Reset the input stream to provide to the transformer
- newConfigBais.reset();
-
- final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- logger.info("Performing transformation for input and saving outputs to {}", confDir);
- performTransformation(newConfigBais, confDir);
-
- logger.info("Reloading instance with new configuration");
- restartInstance();
- } catch (Exception e){
- logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
- Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
- throw e;
- }
- } catch (Exception e){
- logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
- if(!swapConfigFile.delete()){
- logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+ try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
+ Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING);
+ }
+
+ try {
+ logger.info("Persisting changes to {}", configFile.getAbsolutePath());
+ saveFile(newConfigBais, configFile);
+ final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+ try {
+ // Reset the input stream to provide to the transformer
+ newConfigBais.reset();
+
+ logger.info("Performing transformation for input and saving outputs to {}", confDir);
+ ByteBuffer tempConfigFile = performTransformation(newConfigBais, confDir);
+ runner.currentConfigFileReference.set(tempConfigFile.asReadOnlyBuffer());
+
+ try {
+ logger.info("Reloading instance with new configuration");
+ restartInstance();
+ } catch (Exception e) {
+ logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting.");
+ ByteBuffer resetConfigFile = performTransformation(new FileInputStream(swapConfigFile), confDir);
+ runner.currentConfigFileReference.set(resetConfigFile.asReadOnlyBuffer());
+ throw e;
+ }
+ } catch (Exception e) {
+ logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting.");
+ Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING);
+ throw e;
+ }
+ } catch (Exception e) {
+ logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+ if (!swapConfigFile.delete()) {
+ logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually.");
+ }
+ throw e;
}
- throw e;
}
} catch (ConfigurationChangeException e){
logger.error("Unable to carry out reloading of configuration on receipt of notification event", e);
@@ -1567,6 +1582,15 @@ public class RunMiNiFi implements QueryableStatusAggregator {
} catch (IOException ioe) {
logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe);
throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe);
+ } finally {
+ try {
+ if (configInputStream != null) {
+ configInputStream.close() ;
+ }
+ } catch (IOException e) {
+ // Quietly close
+ }
+ handlingLock.unlock();
}
}
@@ -1589,8 +1613,6 @@ public class RunMiNiFi implements QueryableStatusAggregator {
}
}
-
-
private void restartInstance() throws IOException {
try {
runner.reload();
@@ -1600,9 +1622,13 @@ public class RunMiNiFi implements QueryableStatusAggregator {
}
}
- private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
- try {
- ConfigTransformer.transformConfigFile(configIs, configDestinationPath);
+ private static ByteBuffer performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(configIs, byteArrayOutputStream)) {
+
+ ConfigTransformer.transformConfigFile(teeInputStream, configDestinationPath);
+
+ return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
} catch (ConfigurationChangeException e){
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
index bec39e6..236a52d 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java
@@ -25,8 +25,8 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
public class ShutdownHook extends Thread {
@@ -53,13 +53,12 @@ public class ShutdownHook extends Thread {
public void run() {
executor.shutdown();
- System.out.println("Initiating shutdown of bootstrap change notifiers...");
- for (ConfigurationChangeNotifier notifier : runner.getChangeNotifiers()) {
- try {
- notifier.close();
- } catch (IOException ioe) {
- System.out.println("Could not successfully stop notifier " + notifier.getClass() + " due to " + ioe);
- }
+ System.out.println("Initiating shutdown of bootstrap change ingestors...");
+ ConfigurationChangeCoordinator notifier = runner.getChangeCoordinator();
+ try {
+ notifier.close();
+ } catch (IOException ioe) {
+ System.out.println("Could not successfully stop notifier due to " + ioe);
}
System.out.println("Initiating shutdown of bootstrap periodic status reporters...");
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
new file mode 100644
index 0000000..3fa5b8f
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier {
+
+ public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier";
+ public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors";
+ private final static Logger logger = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+ private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>();
+ private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+
+ /**
+ * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
+ *
+ * @param properties from the bootstrap configuration
+ */
+ public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, Collection<ConfigurationChangeListener> changeListenerSet) {
+ final String ingestorsCsv = properties.getProperty(NOTIFIER_INGESTORS_KEY);
+
+ if (ingestorsCsv != null && !ingestorsCsv.isEmpty()) {
+ for (String ingestorClassname : Arrays.asList(ingestorsCsv.split(","))) {
+ ingestorClassname = ingestorClassname.trim();
+ try {
+ Class<?> ingestorClass = Class.forName(ingestorClassname);
+ ChangeIngestor changeIngestor = (ChangeIngestor) ingestorClass.newInstance();
+ changeIngestor.initialize(properties, configurationFileHolder, this);
+ changeIngestors.add(changeIngestor);
+ logger.info("Initialized ");
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException("Issue instantiating ingestor " + ingestorClassname, e);
+ }
+ }
+ }
+ configurationChangeListeners.clear();
+ configurationChangeListeners.addAll(changeListenerSet);
+ }
+
+ /**
+ * Begins the associated notification service provided by the given implementation. In most implementations, no action will occur until this method is invoked.
+ */
+ public void start() {
+ changeIngestors.forEach(ChangeIngestor::start);
+ }
+
+ /**
+ * Provides an immutable collection of listeners for the notifier instance
+ *
+ * @return a collection of those listeners registered for notifications
+ */
+ public Set<ConfigurationChangeListener> getChangeListeners() {
+ return Collections.unmodifiableSet(configurationChangeListeners);
+ }
+
+ /**
+ * Provide the mechanism by which listeners are notified
+ */
+ public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) {
+ logger.info("Notifying Listeners of a change");
+
+ Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size());
+ for (final ConfigurationChangeListener listener : getChangeListeners()) {
+ ListenerHandleResult result;
+ try {
+ listener.handleChange(new ByteBufferInputStream(newConfig.duplicate()));
+ result = new ListenerHandleResult(listener);
+ } catch (ConfigurationChangeException ex) {
+ result = new ListenerHandleResult(listener, ex);
+ }
+ listenerHandleResults.add(result);
+ logger.info("Listener notification result:" + result.toString());
+ }
+ return listenerHandleResults;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ for (ChangeIngestor changeIngestor : changeIngestors) {
+ changeIngestor.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
index 756b051..642ed4b 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java
@@ -19,8 +19,8 @@ package org.apache.nifi.minifi.bootstrap.configuration;
import java.io.InputStream;
/**
- * Interface for handling events detected and driven by an associated {@link ConfigurationChangeNotifier} to which the listener
- * has registered via {@link ConfigurationChangeNotifier#registerListener(ConfigurationChangeListener)}.
+ * Interface for handling events detected and driven by an associated {@link ConfigurationChangeCoordinator} to which the listener
+ * has registered via {@link ConfigurationChangeCoordinator#registerListener(ConfigurationChangeListener)}.
*/
public interface ConfigurationChangeListener {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
index 745ce6c..2ebced5 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java
@@ -1,57 +1,29 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.minifi.bootstrap.configuration;
-import java.io.Closeable;
+import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Properties;
import java.util.Set;
-public interface ConfigurationChangeNotifier extends Closeable {
-
- /**
- * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration
- *
- * @param properties from the bootstrap configuration
- */
- void initialize(Properties properties);
+public interface ConfigurationChangeNotifier {
- /**
- * Begins the associated notification service provided by the given implementation. In most implementations, no action will occur until this method is invoked.
- */
- void start();
-
- /**
- * Provides an immutable collection of listeners for the notifier instance
- *
- * @return a collection of those listeners registered for notifications
- */
Set<ConfigurationChangeListener> getChangeListeners();
- /**
- * Adds a listener to be notified of configuration changes
- *
- * @param listener to be added to the collection
- * @return true if the listener was added; false if already registered
- */
- boolean registerListener(ConfigurationChangeListener listener);
-
- /**
- * Provide the mechanism by which listeners are notified
- */
- Collection<ListenerHandleResult> notifyListeners();
+ Collection<ListenerHandleResult> notifyListeners(ByteBuffer is);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
index 8ac4cea..c0a7e74 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java
@@ -22,34 +22,34 @@ public class ListenerHandleResult {
private final ConfigurationChangeListener configurationChangeListener;
private final Exception failureCause;
- public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){
+ public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener) {
this.configurationChangeListener = configurationChangeListener;
failureCause = null;
}
- public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){
+ public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause) {
this.configurationChangeListener = configurationChangeListener;
this.failureCause = failureCause;
}
- public boolean succeeded(){
+ public boolean succeeded() {
return failureCause == null;
}
- public String getDescriptor(){
+ public String getDescriptor() {
return configurationChangeListener.getDescriptor();
}
- public Exception getFailureCause(){
+ public Exception getFailureCause() {
return failureCause;
}
@Override
public String toString() {
- if(failureCause == null){
+ if (failureCause == null) {
return getDescriptor() + " successfully handled the configuration change";
} else {
- return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'";
+ return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'";
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
new file mode 100644
index 0000000..565a8f4
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class WholeConfigDifferentiator {
+
+
+ private final static Logger logger = LoggerFactory.getLogger(WholeConfigDifferentiator.class);
+
+ public static final String WHOLE_CONFIG_KEY = "Whole Config";
+
+ volatile ConfigurationFileHolder configurationFileHolder;
+
+ boolean compareInputStreamToConfigFile(InputStream inputStream) throws IOException {
+ logger.debug("Checking if change is different");
+ AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+ ByteBuffer currentConfigFile = currentConfigFileReference.get();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
+ DataInputStream dataInputStream = new DataInputStream(inputStream);
+ try {
+ dataInputStream.readFully(byteBuffer.array());
+ } catch (EOFException e) {
+ logger.debug("New config is shorter than the current. Must be different.");
+ return true;
+ }
+ logger.debug("Read the input");
+
+ if (dataInputStream.available() != 0) {
+ return true;
+ } else {
+ return byteBuffer.compareTo(currentConfigFile) != 0;
+ }
+ }
+
+ public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) {
+ this.configurationFileHolder = configurationFileHolder;
+ }
+
+
+ public static class InputStreamInput extends WholeConfigDifferentiator implements Differentiator<InputStream> {
+ public boolean isNew(InputStream inputStream) throws IOException {
+ return compareInputStreamToConfigFile(inputStream);
+ }
+ }
+
+ public static class ByteBufferInput extends WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+ public boolean isNew(ByteBuffer inputBuffer) {
+ AtomicReference<ByteBuffer> currentConfigFileReference = configurationFileHolder.getConfigFileReference();
+ ByteBuffer currentConfigFile = currentConfigFileReference.get();
+ return inputBuffer.compareTo(currentConfigFile) != 0;
+ }
+ }
+
+
+ public static Differentiator<InputStream> getInputStreamDifferentiator() {
+ return new InputStreamInput();
+ }
+
+ public static Differentiator<ByteBuffer> getByteBufferDifferentiator() {
+ return new ByteBufferInput();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
new file mode 100644
index 0000000..5beb78b
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/interfaces/Differentiator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public interface Differentiator <T> {
+ void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+
+ boolean isNew(T input) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
new file mode 100644
index 0000000..1678f20
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/AbstractPullChangeIngestor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+
+public abstract class AbstractPullChangeIngestor implements Runnable, ChangeIngestor {
+
+
+ // 5 minute default pulling period
+ protected static final String DEFAULT_POLLING_PERIOD = "300000";
+ protected static Logger logger;
+
+ protected final AtomicInteger pollingPeriodMS = new AtomicInteger();
+ private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+ protected volatile ConfigurationChangeNotifier configurationChangeNotifier;
+
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ }
+
+ @Override
+ public void start() {
+ scheduledThreadPoolExecutor.scheduleAtFixedRate(this, pollingPeriodMS.get(), pollingPeriodMS.get(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() throws IOException {
+ scheduledThreadPoolExecutor.shutdownNow();
+ }
+
+ public abstract void run();
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
new file mode 100644
index 0000000..39b272d
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.commons.io.input.TeeInputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+/**
+ * FileChangeIngestor provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}. Upon modifications to the associated file,
+ * associated listeners receive notification of a change allowing configuration logic to be reanalyzed. The backing implementation is associated with a {@link ScheduledExecutorService} that
+ * ensures continuity of monitoring.
+ */
+public class FileChangeIngestor implements Runnable, ChangeIngestor {
+
+ private static final Map<String, Supplier<Differentiator<InputStream>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+ static {
+ HashMap<String, Supplier<Differentiator<InputStream>>> tempMap = new HashMap<>();
+ tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getInputStreamDifferentiator);
+
+ DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+ }
+
+
+ protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;
+
+ private final static Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
+ private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".file";
+
+ protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY + ".config.path";
+ protected static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
+ public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY + ".differentiator";
+
+ private Path configFilePath;
+ private WatchService watchService;
+ private long pollingSeconds;
+ private volatile Differentiator<InputStream> differentiator;
+
+ private volatile ConfigurationChangeNotifier configurationChangeNotifier;
+ private ScheduledExecutorService executorService;
+
+ protected static WatchService initializeWatcher(Path filePath) {
+ try {
+ final WatchService fsWatcher = FileSystems.getDefault().newWatchService();
+ final Path watchDirectory = filePath.getParent();
+ watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+
+ return fsWatcher;
+ } catch (IOException ioe) {
+ throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe);
+ }
+ }
+
+ protected boolean targetChanged() {
+ boolean targetChanged = false;
+
+ final WatchKey watchKey = this.watchService.poll();
+
+ if (watchKey == null) {
+ return targetChanged;
+ }
+
+ for (WatchEvent<?> watchEvt : watchKey.pollEvents()) {
+ final WatchEvent.Kind<?> evtKind = watchEvt.kind();
+
+ final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt;
+ final Path changedFile = pathEvent.context();
+
+ // determine target change by verifying if the changed file corresponds to the config file monitored for this path
+ targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFilePath.getName(configFilePath.getNameCount() - 1)));
+ }
+
+ // After completing inspection, reset for detection of subsequent change events
+ boolean valid = watchKey.reset();
+ if (!valid) {
+ throw new IllegalStateException("Unable to reinitialize file system watcher.");
+ }
+
+ return targetChanged;
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Checking for a change");
+ if (targetChanged()) {
+ logger.debug("Target changed, checking if it's different than current flow.");
+ try (FileInputStream configFile = new FileInputStream(configFilePath.toFile());
+ ByteArrayOutputStream pipedOutputStream = new ByteArrayOutputStream();
+ TeeInputStream teeInputStream = new TeeInputStream(configFile, pipedOutputStream)) {
+
+ if (differentiator.isNew(teeInputStream)) {
+ logger.debug("New change, notifying listener");
+ // Fill the byteArrayOutputStream with the rest of the request data
+ while (teeInputStream.available() != 0) {
+ teeInputStream.read();
+ }
+
+ ByteBuffer newConfig = ByteBuffer.wrap(pipedOutputStream.toByteArray());
+ ByteBuffer readOnlyNewConfig = newConfig.asReadOnlyBuffer();
+
+ configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+ logger.debug("Listeners notified");
+ }
+ } catch (Exception e) {
+ logger.error("Could not successfully notify listeners.", e);
+ }
+ }
+ }
+
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+ final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY);
+ final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL));
+
+ if (rawPath == null || rawPath.isEmpty()) {
+ throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified.");
+ }
+
+ try {
+ setConfigFilePath(Paths.get(rawPath));
+ setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT);
+ setWatchService(initializeWatcher(configFilePath));
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize file change notifier.", e);
+ }
+
+ this.configurationChangeNotifier = configurationChangeNotifier;
+
+ final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+ if (differentiatorName != null && !differentiatorName.isEmpty()) {
+ Supplier<Differentiator<InputStream>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+ if (differentiatorSupplier == null) {
+ throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+ "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+ }
+ differentiator = differentiatorSupplier.get();
+ } else {
+ differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+ }
+ differentiator.initialize(properties, configurationFileHolder);
+ }
+
+ protected void setConfigFilePath(Path configFilePath) {
+ this.configFilePath = configFilePath;
+ }
+
+ protected void setWatchService(WatchService watchService) {
+ this.watchService = watchService;
+ }
+
+ protected void setConfigurationChangeNotifier(ConfigurationChangeNotifier configurationChangeNotifier) {
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ }
+
+ protected void setDifferentiator(Differentiator<InputStream> differentiator) {
+ this.differentiator = differentiator;
+ }
+
+ protected void setPollingPeriod(long duration, TimeUnit unit) {
+ if (duration < 0) {
+ throw new IllegalArgumentException("Cannot specify a polling period with duration <=0");
+ }
+ this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit);
+ }
+
+ @Override
+ public void start() {
+ executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("File Change Notifier Thread");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
+ }
+
+ @Override
+ public void close() {
+ if (this.executorService != null) {
+ this.executorService.shutdownNow();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/6f3c5678/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
new file mode 100644
index 0000000..a8e7105
--- /dev/null
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import okhttp3.Call;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator.NOTIFIER_INGESTORS_KEY;
+import static org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator.WHOLE_CONFIG_KEY;
+
+
+public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
+
+ private static final int NOT_MODIFIED_STATUS_CODE = 304;
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP;
+
+ static {
+ HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new HashMap<>();
+ tempMap.put(WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
+
+ DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
+ }
+
+ private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+
+ private static final String PULL_HTTP_BASE_KEY = NOTIFIER_INGESTORS_KEY + ".pull.http";
+ public static final String PULL_HTTP_POLLING_PERIOD_KEY = PULL_HTTP_BASE_KEY + ".period.ms";
+ public static final String PORT_KEY = PULL_HTTP_BASE_KEY + ".port";
+ public static final String HOST_KEY = PULL_HTTP_BASE_KEY + ".hostname";
+ public static final String PATH_KEY = PULL_HTTP_BASE_KEY + ".path";
+ public static final String TRUSTSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".truststore.location";
+ public static final String TRUSTSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".truststore.password";
+ public static final String TRUSTSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".truststore.type";
+ public static final String KEYSTORE_LOCATION_KEY = PULL_HTTP_BASE_KEY + ".keystore.location";
+ public static final String KEYSTORE_PASSWORD_KEY = PULL_HTTP_BASE_KEY + ".keystore.password";
+ public static final String KEYSTORE_TYPE_KEY = PULL_HTTP_BASE_KEY + ".keystore.type";
+ public static final String CONNECT_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".connect.timeout.ms";
+ public static final String READ_TIMEOUT_KEY = PULL_HTTP_BASE_KEY + ".read.timeout.ms";
+ public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY + ".differentiator";
+ public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
+
+ private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
+ private final AtomicReference<Integer> portReference = new AtomicReference<>();
+ private final AtomicReference<String> hostReference = new AtomicReference<>();
+ private final AtomicReference<String> pathReference = new AtomicReference<>();
+ private volatile Differentiator<ByteBuffer> differentiator;
+ private volatile String connectionScheme;
+ private volatile String lastEtag = "";
+ private volatile boolean useEtag = false;
+
+ public PullHttpChangeIngestor() {
+ logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+ }
+
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
+ super.initialize(properties, configurationFileHolder, configurationChangeNotifier);
+
+ pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY, DEFAULT_POLLING_PERIOD)));
+ if (pollingPeriodMS.get() < 1) {
+ throw new IllegalArgumentException("Property, " + PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a positive integer.");
+ }
+
+ final String host = properties.getProperty(HOST_KEY);
+ if (host == null || host.isEmpty()) {
+ throw new IllegalArgumentException("Property, " + HOST_KEY + ", for the hostname to pull configurations from must be specified.");
+ }
+
+ final String path = properties.getProperty(PATH_KEY, "/");
+
+ final String portString = (String) properties.get(PORT_KEY);
+ final Integer port;
+ if (portString == null) {
+ throw new IllegalArgumentException("Property, " + PORT_KEY + ", for the hostname to pull configurations from must be specified.");
+ } else {
+ port = Integer.parseInt(portString);
+ }
+
+ portReference.set(port);
+ hostReference.set(host);
+ pathReference.set(path);
+
+ final String useEtagString = (String) properties.getOrDefault(USE_ETAG_KEY, "false");
+ if ("true".equalsIgnoreCase(useEtagString) || "false".equalsIgnoreCase(useEtagString)){
+ useEtag = Boolean.parseBoolean(useEtagString);
+ } else {
+ throw new IllegalArgumentException("Property, " + USE_ETAG_KEY + ", to specify whether to use the ETag header, must either be a value boolean value (\"true\" or \"false\") or left to " +
+ "the default value of \"false\". It is set to \"" + useEtagString + "\".");
+ }
+
+ httpClientReference.set(null);
+
+ final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
+
+ // Set timeouts
+ okHttpClientBuilder.connectTimeout(Long.parseLong(properties.getProperty(CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+ okHttpClientBuilder.readTimeout(Long.parseLong(properties.getProperty(READ_TIMEOUT_KEY, DEFAULT_READ_TIMEOUT_MS)), TimeUnit.MILLISECONDS);
+
+ // Set whether to follow redirects
+ okHttpClientBuilder.followRedirects(true);
+
+ // check if the ssl path is set and add the factory if so
+ if (properties.containsKey(KEYSTORE_LOCATION_KEY)) {
+ try {
+ setSslSocketFactory(okHttpClientBuilder, properties);
+ connectionScheme = "https";
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ connectionScheme = "http";
+ }
+
+ httpClientReference.set(okHttpClientBuilder.build());
+ final String differentiatorName = properties.getProperty(DIFFERENTIATOR_KEY);
+
+ if (differentiatorName != null && !differentiatorName.isEmpty()) {
+ Supplier<Differentiator<ByteBuffer>> differentiatorSupplier = DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiatorName);
+ if (differentiatorSupplier == null) {
+ throw new IllegalArgumentException("Property, " + DIFFERENTIATOR_KEY + ", has value " + differentiatorName + " which does not " +
+ "correspond to any in the PullHttpChangeIngestor Map:" + DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet());
+ }
+ differentiator = differentiatorSupplier.get();
+ } else {
+ differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+ }
+ differentiator.initialize(properties, configurationFileHolder);
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ logger.debug("Attempting to pull new config");
+ final HttpUrl url = new HttpUrl.Builder()
+ .host(hostReference.get())
+ .port(portReference.get())
+ .encodedPath(pathReference.get())
+ .scheme(connectionScheme)
+ .build();
+
+
+ final Request.Builder requestBuilder = new Request.Builder()
+ .get()
+ .url(url);
+
+ if (useEtag) {
+ requestBuilder.addHeader("If-None-Match", lastEtag);
+ }
+
+ final Request request = requestBuilder.build();
+
+ final OkHttpClient httpClient = httpClientReference.get();
+
+ final Call call = httpClient.newCall(request);
+ final Response response = call.execute();
+
+ logger.debug("Response received: {}", response.toString());
+
+ if (response.code() == NOT_MODIFIED_STATUS_CODE) {
+ return;
+ }
+
+ ResponseBody body = response.body();
+ if (body == null) {
+ logger.warn("No body returned when pulling a new configuration");
+ return;
+ }
+
+ ByteBuffer bodyByteBuffer = ByteBuffer.wrap(body.bytes());
+
+ if (differentiator.isNew(bodyByteBuffer)) {
+ logger.debug("New change, notifying listener");
+
+ ByteBuffer readOnlyNewConfig = bodyByteBuffer.asReadOnlyBuffer();
+
+ configurationChangeNotifier.notifyListeners(readOnlyNewConfig);
+ logger.debug("Listeners notified");
+ } else {
+ logger.debug("Pulled config same as currently running.");
+ }
+
+ if (useEtag) {
+ lastEtag = (new StringBuilder("\""))
+ .append(response.header("ETag").trim())
+ .append("\"").toString();
+ }
+ } catch (Exception e) {
+ logger.warn("Hit an exception while trying to pull", e);
+ }
+ }
+
+ private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, Properties properties) throws Exception {
+ final String keystoreLocation = properties.getProperty(KEYSTORE_LOCATION_KEY);
+ final String keystorePass = properties.getProperty(KEYSTORE_PASSWORD_KEY);
+ final String keystoreType = properties.getProperty(KEYSTORE_TYPE_KEY);
+
+ assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+ try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+ keyStore.load(keyStoreStream, keystorePass.toCharArray());
+ }
+
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+ // load truststore
+ final String truststoreLocation = properties.getProperty(TRUSTSTORE_LOCATION_KEY);
+ final String truststorePass = properties.getProperty(TRUSTSTORE_PASSWORD_KEY);
+ final String truststoreType = properties.getProperty(TRUSTSTORE_TYPE_KEY);
+ assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
+
+ KeyStore truststore = KeyStore.getInstance(truststoreType);
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+ truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+ trustManagerFactory.init(truststore);
+
+ final X509TrustManager x509TrustManager;
+ TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+ if (trustManagers[0] != null) {
+ x509TrustManager = (X509TrustManager) trustManagers[0];
+ } else {
+ throw new IllegalStateException("List of trust managers is null");
+ }
+
+ SSLContext tempSslContext;
+ try {
+ tempSslContext = SSLContext.getInstance("TLS");
+ } catch (NoSuchAlgorithmException e) {
+ logger.warn("Unable to use 'TLS' for the PullHttpChangeIngestor due to NoSuchAlgorithmException. Will attempt to use the default algorithm.", e);
+ tempSslContext = SSLContext.getDefault();
+ }
+
+ final SSLContext sslContext = tempSslContext;
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+ final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+ }
+
+ private void assertKeystorePropertiesSet(String location, String password, String type) {
+ if (location == null || location.isEmpty()) {
+ throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is null or is empty");
+ }
+
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+ }
+
+ if (type == null || type.isEmpty()) {
+ throw new IllegalArgumentException(KEYSTORE_LOCATION_KEY + " is set but " + KEYSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+ }
+ }
+
+ private void assertTruststorePropertiesSet(String location, String password, String type) {
+ if (location == null || location.isEmpty()) {
+ throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is not set or is empty");
+ }
+
+ if (password == null || password.isEmpty()) {
+ throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_PASSWORD_KEY + " is not (or is empty). If the location is set, the password must also be.");
+ }
+
+ if (type == null || type.isEmpty()) {
+ throw new IllegalArgumentException(TRUSTSTORE_LOCATION_KEY + " is set but " + TRUSTSTORE_TYPE_KEY + " is not (or is empty). If the location is set, the type must also be.");
+ }
+ }
+
+ protected void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
+ this.differentiator = differentiator;
+ }
+
+ public void setLastEtag(String lastEtag) {
+ this.lastEtag = lastEtag;
+ }
+
+ public void setUseEtag(boolean useEtag) {
+ this.useEtag = useEtag;
+ }
+}