You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/04/25 17:54:48 UTC
[2/3] nifi-minifi git commit: MINIFI-17 Adding error handling of
configurations that fail to start and a couple other small changes
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 8bb25c8..633cce2 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -19,6 +19,8 @@ package org.apache.nifi.minifi.bootstrap.util;
import org.apache.nifi.controller.FlowSerializationException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -37,6 +39,8 @@ import javax.xml.transform.stream.StreamResult;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
@@ -151,11 +155,19 @@ public final class ConfigTransformer {
// Verify the parsed object is a Map structure
if (loadedObject instanceof Map) {
final Map<String, Object> result = (Map<String, Object>) loadedObject;
+
+ // Create nifi.properties and flow.xml.gz in memory
+ ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
+ writeNiFiProperties(result, nifiPropertiesOutputStream);
+
+ DOMSource flowXml = createFlowXml(result);
+
// Write nifi.properties and flow.xml.gz
- writeNiFiProperties(result, destPath);
- writeFlowXml(result, destPath);
+ writeNiFiPropertiesFile(nifiPropertiesOutputStream, destPath);
+
+ writeFlowXmlFile(flowXml, destPath);
} else {
- throw new IllegalArgumentException("Provided YAML configuration is malformed.");
+ throw new IllegalArgumentException("Provided YAML configuration is not a Map.");
}
} finally {
if (sourceStream != null) {
@@ -164,20 +176,49 @@ public final class ConfigTransformer {
}
}
- private static void writeNiFiProperties(Map<String, Object> topLevelYaml, String path) throws FileNotFoundException, UnsupportedEncodingException {
+ private static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
+ try {
+ final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
+ FileOutputStream nifiProperties = new FileOutputStream(new File(nifiPropertiesPath.toString()));
+ nifiProperties.write(nifiPropertiesOutputStream.getUnderlyingBuffer());
+ } finally {
+ if (nifiPropertiesOutputStream != null){
+ nifiPropertiesOutputStream.flush();
+ nifiPropertiesOutputStream.close();
+ }
+ }
+ }
+
+ private static void writeFlowXmlFile(DOMSource domSource, String path) throws IOException, TransformerException {
+
+ final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
+ final OutputStream outStream = new GZIPOutputStream(fileOut);
+ final StreamResult streamResult = new StreamResult(outStream);
+
+ // configure the transformer and convert the DOM
+ final TransformerFactory transformFactory = TransformerFactory.newInstance();
+ final Transformer transformer = transformFactory.newTransformer();
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+ // transform the document to byte stream
+ transformer.transform(domSource, streamResult);
+ outStream.flush();
+ outStream.close();
+ }
+
+ private static void writeNiFiProperties(Map<String, Object> topLevelYaml, OutputStream outputStream) throws FileNotFoundException, UnsupportedEncodingException, ConfigurationChangeException {
PrintWriter writer = null;
try {
- final Path nifiPropertiesPath = Paths.get(path, "nifi.properties");
- writer = new PrintWriter(nifiPropertiesPath.toFile(), "UTF-8");
+ writer = new PrintWriter(outputStream, true);
- Map<String,Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
- Map<String,Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
+ Map<String, Object> coreProperties = (Map<String, Object>) topLevelYaml.get(CORE_PROPS_KEY);
+ Map<String, Object> flowfileRepo = (Map<String, Object>) topLevelYaml.get(FLOWFILE_REPO_KEY);
Map<String, Object> swapProperties = (Map<String, Object>) flowfileRepo.get(SWAP_PROPS_KEY);
- Map<String,Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
- Map<String,Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
- Map<String,Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- Map<String,Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
-
+ Map<String, Object> contentRepo = (Map<String, Object>) topLevelYaml.get(CONTENT_REPO_KEY);
+ Map<String, Object> componentStatusRepo = (Map<String, Object>) topLevelYaml.get(COMPONENT_STATUS_REPO_KEY);
+ Map<String, Object> securityProperties = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+ Map<String, Object> sensitiveProperties = (Map<String, Object>) securityProperties.get(SENSITIVE_PROPS_KEY);
writer.print(PROPERTIES_FILE_APACHE_2_0_LICENSE);
writer.println("# Core Properties #");
@@ -284,6 +325,8 @@ public final class ConfigTransformer {
writer.println();
writer.println("# cluster manager properties (only configure for cluster manager) #");
writer.println("nifi.cluster.is.manager=false");
+ } catch (NullPointerException e) {
+ throw new ConfigurationChangeException("Failed to parse the config YAML while creating the nifi.properties", e);
} finally {
if (writer != null){
writer.flush();
@@ -291,7 +334,7 @@ public final class ConfigTransformer {
}
}
}
- private static void writeFlowXml(Map<String, Object> topLevelYaml, String path) throws Exception {
+ private static DOMSource createFlowXml(Map<String, Object> topLevelYaml) throws IOException, ConfigurationChangeException {
try {
// create a new, empty document
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -304,39 +347,32 @@ public final class ConfigTransformer {
final Element rootNode = doc.createElement("flowController");
doc.appendChild(rootNode);
Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(rootNode, "maxTimerDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
+ addTextElement(rootNode, "maxEventDrivenThreadCount", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY, "1"));
addProcessGroup(rootNode, topLevelYaml, "rootGroup");
Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
- if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
- final Element controllerServicesNode = doc.createElement("controllerServices");
- rootNode.appendChild(controllerServicesNode);
- addSSLControllerService(controllerServicesNode, securityProps);
+ if (securityProps != null) {
+ String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+ if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+ final Element controllerServicesNode = doc.createElement("controllerServices");
+ rootNode.appendChild(controllerServicesNode);
+ addSSLControllerService(controllerServicesNode, securityProps);
+ }
+ }
+
+ Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+ if (provenanceProperties.get(SCHEDULING_STRATEGY_KEY) != null) {
+ final Element reportingTasksNode = doc.createElement("reportingTasks");
+ rootNode.appendChild(reportingTasksNode);
+ addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
}
- final Element reportingTasksNode = doc.createElement("reportingTasks");
- rootNode.appendChild(reportingTasksNode);
- addProvenanceReportingTask(reportingTasksNode, topLevelYaml);
-
- final DOMSource domSource = new DOMSource(doc);
- final OutputStream fileOut = Files.newOutputStream(Paths.get(path, "flow.xml.gz"));
- final OutputStream outStream = new GZIPOutputStream(fileOut);
- final StreamResult streamResult = new StreamResult(outStream);
-
- // configure the transformer and convert the DOM
- final TransformerFactory transformFactory = TransformerFactory.newInstance();
- final Transformer transformer = transformFactory.newTransformer();
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
- // transform the document to byte stream
- transformer.transform(domSource, streamResult);
- outStream.flush();
- outStream.close();
- } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
+ return new DOMSource(doc);
+ } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
throw new FlowSerializationException(e);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while writing the top level of the flow xml", e);
}
}
@@ -345,109 +381,140 @@ public final class ConfigTransformer {
return value == null ? "" : value.toString();
}
- private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) {
- final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
- addTextElement(serviceElement, "id", "SSL-Context-Service");
- addTextElement(serviceElement, "name", "SSL-Context-Service");
- addTextElement(serviceElement, "comment", "");
- addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
-
- addTextElement(serviceElement, "enabled", "true");
-
- Map<String, Object> attributes = new HashMap<>();
- attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
- attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
- attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
- attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
- attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
- attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
- attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
-
- addConfiguration(serviceElement, attributes);
+ private static <K> String getValueString(Map<K,Object> map, K key, String theDefault){
+ Object value = null;
+ if (map != null){
+ value = map.get(key);
+ }
+ return value == null ? theDefault : value.toString();
+ }
- element.appendChild(serviceElement);
+ private static void addSSLControllerService(final Element element, Map<String, Object> securityProperties) throws ConfigurationChangeException {
+ try {
+ final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+ addTextElement(serviceElement, "id", "SSL-Context-Service");
+ addTextElement(serviceElement, "name", "SSL-Context-Service");
+ addTextElement(serviceElement, "comment", "");
+ addTextElement(serviceElement, "class", "org.apache.nifi.ssl.StandardSSLContextService");
+
+ addTextElement(serviceElement, "enabled", "true");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("Keystore Filename", securityProperties.get(KEYSTORE_KEY));
+ attributes.put("Keystore Type", securityProperties.get(KEYSTORE_TYPE_KEY));
+ attributes.put("Keystore Password", securityProperties.get(KEYSTORE_PASSWORD_KEY));
+ attributes.put("Truststore Filename", securityProperties.get(TRUSTSTORE_KEY));
+ attributes.put("Truststore Type", securityProperties.get(TRUSTSTORE_TYPE_KEY));
+ attributes.put("Truststore Password", securityProperties.get(TRUSTSTORE_PASSWORD_KEY));
+ attributes.put("SSL Protocol", securityProperties.get(SSL_PROTOCOL_KEY));
+
+ addConfiguration(serviceElement, attributes);
+
+ element.appendChild(serviceElement);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to create an SSL Controller Service", e);
+ }
}
- private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) {
- Map<String,Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
+ private static void addProcessGroup(final Element parentElement, Map<String, Object> topLevelYaml, final String elementName) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> flowControllerProperties = (Map<String, Object>) topLevelYaml.get(FLOW_CONTROLLER_PROPS_KEY);
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", "Root-Group");
- addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY) );
- addPosition(element);
- addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement(elementName);
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Root-Group");
+ addTextElement(element, "name", getValueString(flowControllerProperties, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comment", getValueString(flowControllerProperties, COMMENT_KEY));
- Map<String,Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- addProcessor(element, processorConfig);
+ Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
+ addProcessor(element, processorConfig);
- Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
- addRemoteProcessGroup(element, remoteProcessingGroup);
+ Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+ addRemoteProcessGroup(element, remoteProcessingGroup);
- addConnection(element, topLevelYaml);
+ addConnection(element, topLevelYaml);
+ } catch (ConfigurationChangeException e){
+ throw e;
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to creating the root Process Group", e);
+ }
}
- private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) {
+ private static void addProcessor(final Element parentElement, Map<String, Object> processorConfig) throws ConfigurationChangeException {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("processor");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Processor");
- addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
-
- addPosition(element);
- addStyle(element);
-
- addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
- addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
- addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
- addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
- addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
- addTextElement(element, "bulletinLevel", "WARN");
- addTextElement(element, "lossTolerant", "false");
- addTextElement(element, "scheduledState", "RUNNING");
- addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
- addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
-
- addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
-
- Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
- if (autoTerminatedRelationships != null) {
- for (String rel : autoTerminatedRelationships) {
- addTextElement(element, "autoTerminatedRelationship", rel);
+ try {
+ if (processorConfig.get(CLASS_KEY) == null) {
+ // Only add a processor if it has a class
+ return;
+ }
+
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("processor");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Processor");
+ addTextElement(element, "name", getValueString(processorConfig, NAME_KEY));
+
+ addPosition(element);
+ addStyle(element);
+
+ addTextElement(element, "comment", getValueString(processorConfig, COMMENT_KEY));
+ addTextElement(element, "class", getValueString(processorConfig, CLASS_KEY));
+ addTextElement(element, "maxConcurrentTasks", getValueString(processorConfig, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(element, "schedulingPeriod", getValueString(processorConfig, SCHEDULING_PERIOD_KEY));
+ addTextElement(element, "penalizationPeriod", getValueString(processorConfig, PENALIZATION_PERIOD_KEY));
+ addTextElement(element, "yieldPeriod", getValueString(processorConfig, YIELD_PERIOD_KEY));
+ addTextElement(element, "bulletinLevel", "WARN");
+ addTextElement(element, "lossTolerant", "false");
+ addTextElement(element, "scheduledState", "RUNNING");
+ addTextElement(element, "schedulingStrategy", getValueString(processorConfig, SCHEDULING_STRATEGY_KEY));
+ addTextElement(element, "runDurationNanos", getValueString(processorConfig, RUN_DURATION_NANOS_KEY));
+
+ addConfiguration(element, (Map<String, Object>) processorConfig.get(PROCESSOR_PROPS_KEY));
+
+ Collection<String> autoTerminatedRelationships = (Collection<String>) processorConfig.get(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY);
+ if (autoTerminatedRelationships != null) {
+ for (String rel : autoTerminatedRelationships) {
+ addTextElement(element, "autoTerminatedRelationship", rel);
+ }
}
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Processor", e);
}
}
- private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) {
- Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
- final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
- addTextElement(taskElement, "id", "Provenance-Reporting");
- addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
- addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
- addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
- addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
- addTextElement(taskElement, "scheduledState", "RUNNING");
- addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
-
- Map<String, Object> attributes = new HashMap<>();
- attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
- attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
- attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
- attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
- attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
-
- Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
- String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
- if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
- attributes.put("SSL Context Service", "SSL-Context-Service");
- }
+ private static void addProvenanceReportingTask(final Element element, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> provenanceProperties = (Map<String, Object>) topLevelYaml.get(PROVENANCE_REPORTING_KEY);
+ final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+ addTextElement(taskElement, "id", "Provenance-Reporting");
+ addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
+ addTextElement(taskElement, "comment", getValueString(provenanceProperties, COMMENT_KEY));
+ addTextElement(taskElement, "class", "org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask");
+ addTextElement(taskElement, "schedulingPeriod", getValueString(provenanceProperties, SCHEDULING_PERIOD_KEY));
+ addTextElement(taskElement, "scheduledState", "RUNNING");
+ addTextElement(taskElement, "schedulingStrategy", getValueString(provenanceProperties, SCHEDULING_STRATEGY_KEY));
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("Destination URL", provenanceProperties.get(DESTINATION_URL_KEY));
+ attributes.put("Input Port Name", provenanceProperties.get(PORT_NAME_KEY));
+ attributes.put("MiNiFi URL", provenanceProperties.get(ORIGINATING_URL_KEY));
+ attributes.put("Compress Events", provenanceProperties.get(USE_COMPRESSION_KEY));
+ attributes.put("Batch Size", provenanceProperties.get(BATCH_SIZE_KEY));
+
+ Map<String, Object> securityProps = (Map<String, Object>) topLevelYaml.get(SECURITY_PROPS_KEY);
+ String sslAlgorithm = (String) securityProps.get(SSL_PROTOCOL_KEY);
+ if (sslAlgorithm != null && !(sslAlgorithm.isEmpty())) {
+ attributes.put("SSL Context Service", "SSL-Context-Service");
+ }
- addConfiguration(taskElement, attributes);
+ addConfiguration(taskElement, attributes);
- element.appendChild(taskElement);
+ element.appendChild(taskElement);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
+ }
}
private static void addConfiguration(final Element element, Map<String, Object> elementConfig) {
@@ -472,75 +539,103 @@ public final class ConfigTransformer {
parentElement.appendChild(element);
}
- private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) {
-
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("remoteProcessGroup");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Remote-Process-Group");
- addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
- addPosition(element);
- addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
- addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
- addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
- addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
- addTextElement(element, "transmitting", "true");
-
- Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
- addRemoteGroupPort(element, inputPort, "inputPort");
+ private static void addRemoteProcessGroup(final Element parentElement, Map<String, Object> remoteProcessingGroup) throws ConfigurationChangeException {
+ try {
+ if (remoteProcessingGroup.get(URL_KEY) == null) {
+ // Only add an an RPG if it has a URL
+ return;
+ }
- parentElement.appendChild(element);
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("remoteProcessGroup");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Remote-Process-Group");
+ addTextElement(element, "name", getValueString(remoteProcessingGroup, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comment", getValueString(remoteProcessingGroup, COMMENT_KEY));
+ addTextElement(element, "url", getValueString(remoteProcessingGroup, URL_KEY));
+ addTextElement(element, "timeout", getValueString(remoteProcessingGroup, TIMEOUT_KEY));
+ addTextElement(element, "yieldPeriod", getValueString(remoteProcessingGroup, YIELD_PERIOD_KEY));
+ addTextElement(element, "transmitting", "true");
+
+ Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+ addRemoteGroupPort(element, inputPort, "inputPort");
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Remote Process Group", e);
+ }
}
- private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", getValueString(inputPort, ID_KEY));
- addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
- addPosition(element);
- addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
- addTextElement(element, "scheduledState", "RUNNING");
- addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
- addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+ private static void addRemoteGroupPort(final Element parentElement, Map<String, Object> inputPort, final String elementName) throws ConfigurationChangeException {
- parentElement.appendChild(element);
+ try {
+ if (inputPort.get(ID_KEY) == null) {
+ // Only add an input port if it has an ID
+ return;
+ }
+
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement(elementName);
+ parentElement.appendChild(element);
+ addTextElement(element, "id", getValueString(inputPort, ID_KEY));
+ addTextElement(element, "name", getValueString(inputPort, NAME_KEY));
+ addPosition(element);
+ addTextElement(element, "comments", getValueString(inputPort, COMMENT_KEY));
+ addTextElement(element, "scheduledState", "RUNNING");
+ addTextElement(element, "maxConcurrentTasks", getValueString(inputPort, MAX_CONCURRENT_TASKS_KEY));
+ addTextElement(element, "useCompression", getValueString(inputPort, USE_COMPRESSION_KEY));
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the input port of the Remote Process Group", e);
+ }
}
- private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) {
- Map<String,Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
- Map<String,Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
- Map<String,Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("connection");
- parentElement.appendChild(element);
- addTextElement(element, "id", "Connection");
- addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
+ private static void addConnection(final Element parentElement, Map<String, Object> topLevelYaml) throws ConfigurationChangeException {
+ try {
+ Map<String, Object> connectionProperties = (Map<String, Object>) topLevelYaml.get(CONNECTION_PROPS_KEY);
+ Map<String, Object> remoteProcessingGroup = (Map<String, Object>) topLevelYaml.get(REMOTE_PROCESSING_GROUP_KEY);
+ Map<String, Object> inputPort = (Map<String, Object>) remoteProcessingGroup.get(INPUT_PORT_KEY);
+ Map<String, Object> processorConfig = (Map<String, Object>) topLevelYaml.get(PROCESSOR_CONFIG_KEY);
- final Element bendPointsElement = doc.createElement("bendPoints");
- element.appendChild(bendPointsElement);
+ if (inputPort.get(ID_KEY) == null || processorConfig.get(CLASS_KEY) == null) {
+ // Only add the connection if the input port and processor config are created
+ return;
+ }
- addTextElement(element, "labelIndex", "1");
- addTextElement(element, "zIndex", "0");
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("connection");
+ parentElement.appendChild(element);
+ addTextElement(element, "id", "Connection");
+ addTextElement(element, "name", getValueString(connectionProperties, NAME_KEY));
- addTextElement(element, "sourceId", "Processor");
- addTextElement(element, "sourceGroupId", "Root-Group");
- addTextElement(element, "sourceType", "PROCESSOR");
+ final Element bendPointsElement = doc.createElement("bendPoints");
+ element.appendChild(bendPointsElement);
- addTextElement(element, "destinationId", getValueString(inputPort,ID_KEY));
- addTextElement(element, "destinationGroupId", "Remote-Process-Group");
- addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
+ addTextElement(element, "labelIndex", "1");
+ addTextElement(element, "zIndex", "0");
- addTextElement(element, "relationship", "success");
+ addTextElement(element, "sourceId", "Processor");
+ addTextElement(element, "sourceGroupId", "Root-Group");
+ addTextElement(element, "sourceType", "PROCESSOR");
- addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
- addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
+ addTextElement(element, "destinationId", getValueString(inputPort, ID_KEY));
+ addTextElement(element, "destinationGroupId", "Remote-Process-Group");
+ addTextElement(element, "destinationType", "REMOTE_INPUT_PORT");
- addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
- addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+ addTextElement(element, "relationship", "success");
+ addTextElement(element, "maxWorkQueueSize", getValueString(connectionProperties, MAX_WORK_QUEUE_SIZE_KEY));
+ addTextElement(element, "maxWorkQueueDataSize", getValueString(connectionProperties, MAX_WORK_QUEUE_DATA_SIZE_KEY));
- parentElement.appendChild(element);
+ addTextElement(element, "flowFileExpiration", getValueString(connectionProperties, FLOWFILE_EXPIRATION__KEY));
+ addTextElement(element, "queuePrioritizerClass", getValueString(connectionProperties, QUEUE_PRIORITIZER_CLASS_KEY));
+
+ parentElement.appendChild(element);
+ } catch (Exception e){
+ throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the connection from the Processor to the input port of the Remote Process Group", e);
+ }
}
private static void addPosition(final Element parentElement) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
deleted file mode 100644
index 9432a2f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestFileChangeNotifier.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeNotifier {
-
- private static final String CONFIG_FILENAME = "config.yml";
- private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
- private FileChangeNotifier notifierSpy;
- private WatchService mockWatchService;
- private Properties testProperties;
-
- @Before
- public void setUp() throws Exception {
- mockWatchService = Mockito.mock(WatchService.class);
- notifierSpy = Mockito.spy(new FileChangeNotifier());
- notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
- notifierSpy.setWatchService(mockWatchService);
-
- testProperties = new Properties();
- testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
- testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
- }
-
- @After
- public void tearDown() throws Exception {
- notifierSpy.close();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidFile() throws Exception {
- testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
- notifierSpy.initialize(testProperties);
- }
-
- @Test
- public void testInitialize_validFile() throws Exception {
- notifierSpy.initialize(testProperties);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidPollingPeriod() throws Exception {
- testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
- notifierSpy.initialize(testProperties);
- }
-
- @Test
- public void testInitialize_useDefaultPolling() throws Exception {
- testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
- notifierSpy.initialize(testProperties);
- }
-
-
- @Test
- public void testNotifyListeners() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(testListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- notifierSpy.notifyListeners();
-
- verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testRegisterListener() throws Exception {
- final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
- wasRegistered = notifierSpy.registerListener(secondListener);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
-
- }
-
- @Test
- public void testRegisterDuplicateListener() throws Exception {
- final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
- boolean wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- wasRegistered = notifierSpy.registerListener(firstListener);
-
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
- Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
- }
-
- /* Verify handleChange events */
- @Test
- public void testTargetChangedNoModification() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- // In this case the WatchKey is null because there were no events found
- establishMockEnvironmentForChangeTests(testListener, null);
-
- verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
- final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
- establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
- notifierSpy.targetChanged();
-
- verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
-
- final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
- // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
- establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
-
- // Invoke the method of interest
- notifierSpy.run();
-
- verify(mockWatchService, Mockito.atLeastOnce()).poll();
- verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
- }
-
- /* Helper methods to establish mock environment */
- private WatchKey createMockWatchKeyForPath(String configFilePath) {
- final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
- final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
- when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
- when(mockWatchKey.reset()).thenReturn(true);
-
- final Iterator mockIterator = Mockito.mock(Iterator.class);
- when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
- final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
- when(mockIterator.hasNext()).thenReturn(true, false);
- when(mockIterator.next()).thenReturn(mockWatchEvent);
-
- // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
- when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
- when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
- return mockWatchKey;
- }
-
- private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
- final boolean wasRegistered = notifierSpy.registerListener(listener);
-
- // Establish the file mock and its parent directory
- final Path mockConfigFilePath = Mockito.mock(Path.class);
- final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
- // When getting the parent of the file, get the directory
- when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
- Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
- Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
-
- when(mockWatchService.poll()).thenReturn(watchKey);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
deleted file mode 100644
index 75b44e3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
-
- @BeforeClass
- public static void setUp() throws InterruptedException, MalformedURLException {
- Properties properties = new Properties();
- restChangeNotifier = new RestChangeNotifier();
- restChangeNotifier.initialize(properties);
- restChangeNotifier.registerListener(mockChangeListener);
- restChangeNotifier.start();
-
- client = new OkHttpClient();
-
- url = restChangeNotifier.getURI().toURL().toString();
- Thread.sleep(1000);
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeNotifier.close();
- client = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
deleted file mode 100644
index 908e693..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestRestChangeNotifierSSL.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-
-import com.squareup.okhttp.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.configuration.util.TestRestChangeNotifierCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-
-
-public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
-
-
- @BeforeClass
- public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
- Properties properties = new Properties();
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
- properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
- restChangeNotifier = new RestChangeNotifier();
- restChangeNotifier.initialize(properties);
- restChangeNotifier.registerListener(mockChangeListener);
- restChangeNotifier.start();
-
- client = new OkHttpClient();
-
- SSLContext sslContext = SSLContext.getInstance("TLS");
- TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
-
- KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
-
- sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
- client.setSslSocketFactory(sslContext.getSocketFactory());
-
- url = restChangeNotifier.getURI().toURL().toString();
- Thread.sleep(1000);
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeNotifier.close();
- client = null;
- }
-
- private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
- KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-
- char[] password = "localtest".toCharArray();
-
- java.io.FileInputStream fis = null;
- try {
- fis = new java.io.FileInputStream(path);
- ks.load(fis, password);
- } finally {
- if (fis != null) {
- fis.close();
- }
- }
- return ks;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
new file mode 100644
index 0000000..145c2fe
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestFileChangeNotifier.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.FileChangeNotifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFileChangeNotifier {
+
+ private static final String CONFIG_FILENAME = "config.yml";
+ private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+ private FileChangeNotifier notifierSpy;
+ private WatchService mockWatchService;
+ private Properties testProperties;
+
+ @Before
+ public void setUp() throws Exception {
+ mockWatchService = Mockito.mock(WatchService.class);
+ notifierSpy = Mockito.spy(new FileChangeNotifier());
+ notifierSpy.setConfigFile(Paths.get(TEST_CONFIG_PATH));
+ notifierSpy.setWatchService(mockWatchService);
+
+ testProperties = new Properties();
+ testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+ testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, FileChangeNotifier.DEFAULT_POLLING_PERIOD_INTERVAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ notifierSpy.close();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidFile() throws Exception {
+ testProperties.put(FileChangeNotifier.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test
+ public void testInitialize_validFile() throws Exception {
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidPollingPeriod() throws Exception {
+ testProperties.put(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY, "abc");
+ notifierSpy.initialize(testProperties);
+ }
+
+ @Test
+ public void testInitialize_useDefaultPolling() throws Exception {
+ testProperties.remove(FileChangeNotifier.POLLING_PERIOD_INTERVAL_KEY);
+ notifierSpy.initialize(testProperties);
+ }
+
+
+ @Test
+ public void testNotifyListeners() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(testListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ notifierSpy.notifyListeners();
+
+ verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testRegisterListener() throws Exception {
+ final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+ wasRegistered = notifierSpy.registerListener(secondListener);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 2);
+
+ }
+
+ @Test
+ public void testRegisterDuplicateListener() throws Exception {
+ final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+ boolean wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ wasRegistered = notifierSpy.registerListener(firstListener);
+
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+ Assert.assertFalse("Registration did not correspond to newly added listener", wasRegistered);
+ }
+
+ /* Verify handleChange events */
+ @Test
+ public void testTargetChangedNoModification() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ // In this case the WatchKey is null because there were no events found
+ establishMockEnvironmentForChangeTests(testListener, null);
+
+ verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+ final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+ establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+ notifierSpy.targetChanged();
+
+ verify(testListener, Mockito.never()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+
+ final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+ // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+ establishMockEnvironmentForChangeTests(testListener, mockWatchKey);
+
+ // Invoke the method of interest
+ notifierSpy.run();
+
+ verify(mockWatchService, Mockito.atLeastOnce()).poll();
+ verify(testListener, Mockito.atLeastOnce()).handleChange(Mockito.any(InputStream.class));
+ }
+
+ /* Helper methods to establish mock environment */
+ private WatchKey createMockWatchKeyForPath(String configFilePath) {
+ final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+ final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+ when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+ when(mockWatchKey.reset()).thenReturn(true);
+
+ final Iterator mockIterator = Mockito.mock(Iterator.class);
+ when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+ final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+ // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+ when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+ when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+ return mockWatchKey;
+ }
+
+ private void establishMockEnvironmentForChangeTests(ConfigurationChangeListener listener, final WatchKey watchKey) throws Exception {
+ final boolean wasRegistered = notifierSpy.registerListener(listener);
+
+ // Establish the file mock and its parent directory
+ final Path mockConfigFilePath = Mockito.mock(Path.class);
+ final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+ // When getting the parent of the file, get the directory
+ when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+ Assert.assertTrue("Registration did not correspond to newly added listener", wasRegistered);
+ Assert.assertEquals("Did not receive the correct number of registered listeners", notifierSpy.getChangeListeners().size(), 1);
+
+ when(mockWatchService.poll()).thenReturn(watchKey);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
new file mode 100644
index 0000000..1cd37fd
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifier extends TestRestChangeNotifierCommon {
+
+ @BeforeClass
+ public static void setUp() throws InterruptedException, MalformedURLException {
+ Properties properties = new Properties();
+ restChangeNotifier = new RestChangeNotifier();
+ restChangeNotifier.initialize(properties);
+ restChangeNotifier.registerListener(mockChangeListener);
+ restChangeNotifier.start();
+
+ client = new OkHttpClient();
+
+ url = restChangeNotifier.getURI().toURL().toString();
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeNotifier.close();
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
new file mode 100644
index 0000000..6073a6f
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/TestRestChangeNotifierSSL.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers;
+
+
+import com.squareup.okhttp.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.util.TestRestChangeNotifierCommon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+
+
+public class TestRestChangeNotifierSSL extends TestRestChangeNotifierCommon {
+
+
+ @BeforeClass
+ public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+ Properties properties = new Properties();
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeNotifier.TRUSTSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeNotifier.KEYSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeNotifier.NEED_CLIENT_AUTH_KEY, "true");
+ restChangeNotifier = new RestChangeNotifier();
+ restChangeNotifier.initialize(properties);
+ restChangeNotifier.registerListener(mockChangeListener);
+ restChangeNotifier.start();
+
+ client = new OkHttpClient();
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(readKeyStore("./src/test/resources/localhost-ts.jks"));
+
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(readKeyStore("./src/test/resources/localhost-ks.jks"), "localtest".toCharArray());
+
+ sslContext.init(keyManagerFactory.getKeyManagers(),trustManagerFactory.getTrustManagers(), new SecureRandom());
+ client.setSslSocketFactory(sslContext.getSocketFactory());
+
+ url = restChangeNotifier.getURI().toURL().toString();
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeNotifier.close();
+ client = null;
+ }
+
+ private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+ char[] password = "localtest".toCharArray();
+
+ java.io.FileInputStream fis = null;
+ try {
+ fis = new java.io.FileInputStream(path);
+ ks.load(fis, password);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ return ks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
new file mode 100644
index 0000000..eae5872
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/MockChangeListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class MockChangeListener implements ConfigurationChangeListener {
+ String confFile;
+
+ @Override
+ public void handleChange(InputStream inputStream) {
+ try {
+ confFile = IOUtils.toString(inputStream, "UTF-8");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getDescriptor() {
+ return "MockChangeListener";
+ }
+
+ public String getConfFile() {
+ return confFile;
+ }
+
+ public void setConfFile(String confFile) {
+ this.confFile = confFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
new file mode 100644
index 0000000..78f6cd5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/util/TestRestChangeNotifierCommon.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.notifiers.util;
+
+import com.squareup.okhttp.Headers;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.notifiers.RestChangeNotifier;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public abstract class TestRestChangeNotifierCommon {
+
+ public static OkHttpClient client;
+ public static RestChangeNotifier restChangeNotifier;
+ public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
+ public static String url;
+ public static MockChangeListener mockChangeListener = new MockChangeListener();
+
+ @Test
+ public void testGet() throws Exception {
+ assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+ Request request = new Request.Builder()
+ .url(url)
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
+ }
+
+ @Test
+ public void testFileUpload() throws Exception {
+ assertEquals(1, restChangeNotifier.getChangeListeners().size());
+
+ File file = new File("src/test/resources/testUploadFile.txt");
+ assertTrue(file.exists());
+ assertTrue(file.canRead());
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
+ .addHeader("charset","UTF-8")
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+ assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
deleted file mode 100644
index 6843889..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/MockChangeListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class MockChangeListener implements ConfigurationChangeListener {
- String confFile;
-
- @Override
- public void handleChange(InputStream inputStream) {
- try {
- confFile = IOUtils.toString(inputStream, "UTF-8");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public String getConfFile() {
- return confFile;
- }
-
- public void setConfFile(String confFile) {
- this.confFile = confFile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
deleted file mode 100644
index b3c4f54..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/util/TestRestChangeNotifierCommon.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.util;
-
-import com.squareup.okhttp.Headers;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
-import org.apache.nifi.minifi.bootstrap.configuration.RestChangeNotifier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public abstract class TestRestChangeNotifierCommon {
-
- public static OkHttpClient client;
- public static RestChangeNotifier restChangeNotifier;
- public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
- public static String url;
- public static MockChangeListener mockChangeListener = new MockChangeListener();
-
- @Test
- public void testGet() throws Exception {
- assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
- Request request = new Request.Builder()
- .url(url)
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals(RestChangeNotifier.GET_TEXT, response.body().string());
- }
-
- @Test
- public void testFileUpload() throws Exception {
- assertEquals(1, restChangeNotifier.getChangeListeners().size());
-
- File file = new File("src/test/resources/testUploadFile.txt");
- assertTrue(file.exists());
- assertTrue(file.canRead());
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
- .addHeader("charset","UTF-8")
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals(RestChangeNotifier.POST_TEXT, response.body().string());
-
- assertEquals(new String(Files.readAllBytes(file.toPath())), mockChangeListener.getConfFile());
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
index 1a7f261..d0a7d71 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.junit.Assert;
import org.junit.Test;
@@ -63,6 +64,24 @@ public class TestConfigTransformer {
flowXml.deleteOnExit();
}
+ @Test
+ public void doesTransformOnDefaultFile() throws Exception {
+
+ ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
@Test(expected = IllegalArgumentException.class)
public void handleTransformInvalidFile() throws Exception {
@@ -70,4 +89,12 @@ public class TestConfigTransformer {
Assert.fail("Invalid configuration file was not detected.");
}
+
+ @Test(expected = ConfigurationChangeException.class)
+ public void handleTransformEmptyFile() throws Exception {
+
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
+
+ Assert.fail("Invalid configuration file was not detected.");
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/test/resources/config-empty.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/config-empty.yml b/minifi-bootstrap/src/test/resources/config-empty.yml
new file mode 100644
index 0000000..fbbbeb9
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-empty.yml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Flow Controller:
+ name: MiNiFi Flow
+ comment:
\ No newline at end of file