You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2019/03/16 14:09:21 UTC
[nifi] branch master updated: NIFI-5318 Implement NiFi test
harness: initial commit NIFI-5318 Implement NiFi test harness: replaced
original sample feed payload with synthetic content NIFI-5318 Implement
NiFi test harness: fixed test harness run crash issue;
better reporting of paths NIFI-5318 Implement NiFi test harness: added
further states where NiFi version can be queried NIFI-5318 Implement NiFi
test harness: fixed incorrect class reference NIFI-5318 Implement NiFi test
harness: added type parameter bounding to se [...]
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 4323572 NIFI-5318 Implement NiFi test harness: initial commit NIFI-5318 Implement NiFi test harness: replaced original sample feed payload with synthetic content NIFI-5318 Implement NiFi test harness: fixed test harness run crash issue; better reporting of paths NIFI-5318 Implement NiFi test harness: added further states where NiFi version can be queried NIFI-5318 Implement NiFi test harness: fixed incorrect class reference NIFI-5318 Implement NiFi test harness: added type param [...]
4323572 is described below
commit 43235724e2031c30fc48e05be0d2136fd58efa52
Author: Peter G. Horvath <pe...@gmail.com>
AuthorDate: Sat Nov 10 22:22:01 2018 +0100
NIFI-5318 Implement NiFi test harness: initial commit
NIFI-5318 Implement NiFi test harness: replaced original sample feed payload with synthetic content
NIFI-5318 Implement NiFi test harness: fixed test harness run crash issue; better reporting of paths
NIFI-5318 Implement NiFi test harness: added further states where NiFi version can be queried
NIFI-5318 Implement NiFi test harness: fixed incorrect class reference
NIFI-5318 Implement NiFi test harness: added type parameter bounding to setClassOfSingleProcessor to prevent configuring obviously incorrect classes
NIFI-5318 Updated project version.
This closes #3165
Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
nifi-testharness/.gitignore | 1 +
.../NIFI_TESTHARNESS_README.txt | 3 +
nifi-testharness/pom.xml | 233 ++++++++
.../SimpleNiFiFlowDefinitionEditor.java | 203 +++++++
.../apache/nifi/testharness/TestNiFiInstance.java | 620 +++++++++++++++++++++
.../nifi/testharness/TestNiFiInstanceAware.java | 23 +
.../testharness/api/FlowFileEditorCallback.java | 46 ++
.../apache/nifi/testharness/util/FileUtils.java | 88 +++
.../testharness/util/NiFiCoreLibClassLoader.java | 84 +++
.../org/apache/nifi/testharness/util/XmlUtils.java | 67 +++
.../java/org/apache/nifi/testharness/util/Zip.java | 134 +++++
.../apache/nifi/testharness/samples/Constants.java | 32 ++
.../nifi/testharness/samples/NiFiFlowTest.java | 157 ++++++
.../nifi/testharness/samples/NiFiMockFlowTest.java | 119 ++++
.../apache/nifi/testharness/samples/TestUtils.java | 57 ++
.../nifi/testharness/samples/mock/GetHTTPMock.java | 90 +++
.../testharness/samples/mock/MockProcessor.java | 101 ++++
nifi-testharness/src/test/resources/flow.xml | 154 +++++
.../src/test/resources/logback-test.xml | 15 +
.../src/test/resources/sample_technology_rss.xml | 24 +
pom.xml | 1 +
21 files changed, 2252 insertions(+)
diff --git a/nifi-testharness/.gitignore b/nifi-testharness/.gitignore
new file mode 100644
index 0000000..17dff51
--- /dev/null
+++ b/nifi-testharness/.gitignore
@@ -0,0 +1 @@
+nifi_testharness_nifi_home/*
\ No newline at end of file
diff --git a/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
new file mode 100644
index 0000000..e2d4da0
--- /dev/null
+++ b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
@@ -0,0 +1,3 @@
+This directory is used to mimic NiFi's own home directory: the JVM hosting the
+TestNiFiInstance has to be started here. Once started, TestNiFiInstance then
+creates symlinks to the actual NiFi installation directory.
\ No newline at end of file
diff --git a/nifi-testharness/pom.xml b/nifi-testharness/pom.xml
new file mode 100644
index 0000000..95cafc0
--- /dev/null
+++ b/nifi-testharness/pom.xml
@@ -0,0 +1,233 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-testharness</artifactId>
+ <description>A test harness for running NiFi flow tests</description>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt</exclude>
+ <exclude>src/test/resources/sample_technology_rss.xml</exclude>
+ <exclude>src/test/resources/logback-test.xml</exclude>
+ <exclude>src/test/resources/flow.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.20.1</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <workingDirectory>nifi_testharness_nifi_home</workingDirectory>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>skip-testharness-tests</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <java.awt.headless>true</java.awt.headless>
+ </systemPropertyVariables>
+ <excludes>
+ <exclude>**/samples/*Test.class</exclude>
+ <exclude>**/samples/Test*.class</exclude>
+ <exclude>**/samples/*Spec.class</exclude>
+ </excludes>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine combine.children="append">-Xmx1G
+ -Djava.net.preferIPv4Stack=true
+ ${maven.surefire.arguments}
+ </argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>run-testharness-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <reuseForks>false</reuseForks>
+ <workingDirectory>${project.basedir}/nifi_testharness_nifi_home</workingDirectory>
+ <systemPropertyVariables>
+ <java.awt.headless>true</java.awt.headless>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/*Test.class</include>
+ <include>**/Test*.class</include>
+ <include>**/*Spec.class</include>
+ </includes>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine combine.children="append">-Xmx1G
+ -Djava.net.preferIPv4Stack=true
+ ${maven.surefire.arguments}
+ </argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <org.slf4j.version>1.7.25</org.slf4j.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-assembly</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>pom</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-framework-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-framework-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-bootstrap</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <version>3.2</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
new file mode 100644
index 0000000..27e6477
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import java.util.LinkedList;
+import java.util.Objects;
+
+
+/**
+ * <p>
+ * A facility to describe simple, common changes to a NiFi flow before it is installed to the test
+ * NiFi instance. Intended to be used by
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}
+ * </p>
+ *
+ * <p>
+ * The desired edits can be configured via the {@link Builder} object returned by the {@link #builder()}
+ * method. Once fully configured, the {@link Builder#build()} emits a {@code FlowFileEditorCallback}
+ * object that can be passed to
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}.
+ * </p>
+ *
+ * <p>
+ * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ * </p>
+ *
+ * @see TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)
+ *
+ */
+
+public final class SimpleNiFiFlowDefinitionEditor implements FlowFileEditorCallback, TestNiFiInstanceAware {
+
+
+ private final LinkedList<FlowFileEditorCallback> delegateActions;
+ private TestNiFiInstance testNiFiInstance;
+
+ private SimpleNiFiFlowDefinitionEditor(LinkedList<FlowFileEditorCallback> delegateActions) {
+ this.delegateActions = delegateActions;
+ }
+
+ @Override
+ public Document edit(Document document) throws Exception {
+
+ for (FlowFileEditorCallback change : delegateActions) {
+ if (change instanceof TestNiFiInstanceAware) {
+ ((TestNiFiInstanceAware)change).setTestNiFiInstance(testNiFiInstance);
+ }
+
+ document = change.edit(document);
+ }
+
+ return document;
+ }
+
+ @Override
+ public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) {
+ this.testNiFiInstance = Objects.requireNonNull(
+ testNiFiInstance, "argument testNiFiInstance cannot be null");
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private Builder() {
+ // no external instance
+ }
+
+ private XPath xpath = XPathFactory.newInstance().newXPath();
+ private final LinkedList<FlowFileEditorCallback> actions = new LinkedList<>();
+
+ public Builder rawXmlChange(FlowFileEditorCallback flowFileEditorCallback) {
+ actions.addLast(flowFileEditorCallback);
+ return this;
+ }
+
+ public Builder setSingleProcessorProperty(String processorName, String propertyName, String newValue) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName
+ + "']/property[name/text() = '" + propertyName + "']/value";
+
+ Node propertyValueNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (propertyValueNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +"' with property '"
+ + propertyName + "' not found: " + xpathString);
+ }
+
+ propertyValueNode.setTextContent(newValue);
+
+ return document;
+ });
+
+
+ }
+
+ public <P extends Processor> Builder setClassOfSingleProcessor(String processorName, Class<P> mockProcessor) {
+
+ return setClassOfSingleProcessor(processorName, mockProcessor.getName());
+ }
+
+ public Builder setClassOfSingleProcessor(String processorName, String newFullyQualifiedClassName) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName + "']/class";
+
+ Node classNameNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (classNameNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +" not found: " +
+ xpathString);
+ }
+
+ classNameNode.setTextContent(newFullyQualifiedClassName);
+
+ return document;
+ });
+ }
+
+ public Builder updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() {
+
+ return rawXmlChange(new UpdateFlowFileNiFiVersionFlowFileEditorCallback());
+ }
+
+
+ public SimpleNiFiFlowDefinitionEditor build() {
+ return new SimpleNiFiFlowDefinitionEditor(actions);
+ }
+
+ }
+
+
+ private static final class UpdateFlowFileNiFiVersionFlowFileEditorCallback
+ implements FlowFileEditorCallback, TestNiFiInstanceAware {
+
+ private TestNiFiInstance testNiFiInstance;
+
+ @Override
+ public Document edit(Document document) throws Exception {
+ String niFiVersion = getNiFiVersion();
+
+ XPath xpath = XPathFactory.newInstance().newXPath();
+
+ NodeList processorNodeVersionList = (NodeList)
+ xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version",
+ document, XPathConstants.NODESET);
+
+ final int length = processorNodeVersionList.getLength();
+ for (int i=0; i<length; i++) {
+ Node processorNodeVersion = processorNodeVersionList.item(i);
+
+ processorNodeVersion.setTextContent(niFiVersion);
+ }
+
+ return document;
+ }
+
+ private String getNiFiVersion() {
+ return Objects.requireNonNull(
+ testNiFiInstance, "testNiFiInstance cannot be null").getNifiVersion();
+ }
+
+ @Override
+ public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) {
+ this.testNiFiInstance = Objects.requireNonNull(
+ testNiFiInstance, "argument testNiFiInstance cannot be null");
+
+ }
+
+
+ }
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
new file mode 100644
index 0000000..e8a1fc6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness;
+
+import org.apache.nifi.EmbeddedNiFi;
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.apache.nifi.testharness.util.NiFiCoreLibClassLoader;
+import org.apache.nifi.testharness.util.XmlUtils;
+import org.apache.nifi.testharness.util.Zip;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+
+/**
+ * <p>
+ * An API wrapper of a "test" NiFi instance to which a flow definition is installed for testing.</p>
+ *
+ * <p>
+ * Due to NiFi design restrictions, {@code TestNiFiInstance} has to take <i>full command</i>
+ * of the current working directory: it installs a full NiFi installation to there. To ensure
+ * this is desired, <strong>it will only run if the current directory is called
+ * "nifi_testharness_nifi_home"</strong>. As such the JVM process has to be started inside a directory
+ * called "nifi_testharness_nifi_home" so that the following is true:
+ *
+ * <pre><tt>
+ * new File(System.getProperty("user.dir")).getName().equals("nifi_testharness_nifi_home")
+ * </tt></pre>
+ * </p>
+ *
+ * <p>
+ * Before {@code TestNiFiInstance} can be used, it has to be configured via its builder
+ * interface:
+ * <ul>
+ * <li>
+ * {@link Builder#setNiFiBinaryDistributionZip(File)} specifies the location of the NiFi binary
+ * distribution ZIP file to be used.
+ * </li>
+ * <li>
+ * {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi flow
+ * to install.
+ * </li>
+ * <li>
+ * {@link Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} allows on-the-fly
+ * changes to be performed to the Flow file before it is actually installed.
+ * </li>
+ * </ul>
+ *
+ * <h5>Sample</h5>
+ * <pre><tt>
+ * TestNiFiInstance testNiFiInstance = TestNiFiInstance.builder()
+ * .setNiFiBinaryDistributionZip(YourConstants.NIFI_ZIP_FILE)
+ * .setFlowXmlToInstallForTesting(YourConstants.FLOW_XML_FILE)
+ * .modifyFlowXmlBeforeInstalling(YourConstants.FLOW_FILE_CHANGES_FOR_TESTS)
+ * .build();
+ * </tt></pre>
+ *
+ * </p>
+ *
+ * <p>
+ * If the current working directory is called "nifi_testharness_nifi_home", the caller can
+ * {@link #install()} this {@code TestNiFiInstance}, which will
+ * <ol>
+ * <li>
+ * (as a first cleanup step) erase all content of the current working directory.
+ * (NOTE: this potentially destructive operation is the reason why we have the
+ * "nifi_testharness_nifi_home" directory name guard in place!)
+ * </li>
+ * <li>
+ * Extracts the contents of the NiFi binary distribution ZIP file specified in
+ * the configuration to a to a temporary directory.
+ * <li>
+ * Symlinks all files from the temporary directory to the current working
+ * directory, causing the directory to hold a fully functional
+ * NiFi installation.
+ * </li>
+ * <li>
+ * Installs the flow definition files(s) to the NiFi instance specified in
+ * the configuration.
+ * </li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ *
+ * The caller then can proceed to {@link #start()} this {@code TestNiFiInstance},
+ * which will bootstrap the NiFi engine, which in turn will pick up and start processing
+ * the flow definition supplied by the caller in the configuration.
+ * </p>
+ *
+ * <p>
+ * Once the previous step is done, the caller can perform asserts regarding the observed behaviour
+ * of the NiFi flow, just like one would do it with standard Java test cases.
+ * </p>
+ *
+ * <p>
+ * To perform a clean shutdown of the hosted NiFi instance, the caller is required to call
+ * {@link #stopAndCleanup()}, which will shut down NiFi and remove all temporary files, including
+ * the symlinks created in the current working directory.
+ * </p>
+ *
+ *
+ * <h4>NOTES</h4>
+ * <ul>
+ * <li>
+ * {@code TestNiFiInstance} is NOT thread safe: if more than one thread uses it,
+ * external synchronisation is required.
+ * </li>
+ * <li>
+ * Only one {@code TestNiFiInstance} can be started in the same "nifi_testharness_nifi_home"
+ * directory at the same time.
+ * </li>
+ * <li>
+ * Currently, due to NiFi limitations, one {@code TestNiFiInstance} can be started per JVM process.
+ * If multiple test cases are required, launch a new JVM process per test case
+ * (in sequence, see the point above): Maven/Surefire has built-in support for this.
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ * </p>
+ *
+ *
+ * @see TestNiFiInstance#builder()
+ *
+ *
+ */
+public class TestNiFiInstance {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestNiFiInstance.class);
+
+
+ private EmbeddedNiFi testNiFi;
+
+ private final File nifiHomeDir;
+ private final File bootstrapLibDir;
+
+ private File nifiProperties;
+
+ private final File flowXmlGz;
+
+ private final File placeholderNiFiHomeDir;
+
+ private String nifiVersion;
+
+
+ private enum State {
+ STOPPED,
+ STOP_FAILED,
+ START_FAILED(STOPPED),
+ STARTED(STOPPED, STOP_FAILED),
+ INSTALLATION_FAILED(),
+ FLOW_INSTALLED(STARTED, START_FAILED),
+ INSTALLED(FLOW_INSTALLED, INSTALLATION_FAILED),
+ CREATED(INSTALLED, INSTALLATION_FAILED);
+
+
+ private final Set<State> allowedTransitions;
+
+ State(State... allowedTransitions) {
+ this.allowedTransitions = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedTransitions)));
+ }
+
+ private void checkCanTransition(State newState) {
+ if (!this.allowedTransitions.contains(newState)) {
+ throw new IllegalStateException("Cannot transition from " + this + " to " + newState);
+ }
+ }
+ }
+
+ private State currentState = State.CREATED;
+
+ private final File nifiBinaryZip;
+ private final File flowXml;
+ private final FlowFileEditorCallback editCallback;
+
+ private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallback editCallback) {
+ this.nifiBinaryZip = Objects.requireNonNull(nifiBinaryZip, "nifiBinaryZip");
+ this.flowXml = Objects.requireNonNull(flowXml, "flowXml");
+ this.editCallback = editCallback;
+
+ nifiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+
+ final File configDir = new File(nifiHomeDir, "conf");
+ final File libDir = new File(nifiHomeDir, "lib");
+
+ bootstrapLibDir = new File(libDir, "bootstrap");
+
+ nifiProperties = new File(configDir, "nifi.properties");
+
+ flowXmlGz = new File(configDir, "flow.xml.gz");
+
+ placeholderNiFiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+ }
+
+ String getNifiVersion() {
+ switch (currentState) {
+ case INSTALLED:
+ case FLOW_INSTALLED:
+ case STARTED:
+ case START_FAILED:
+ case STOP_FAILED:
+ case STOPPED:
+
+ return Objects.requireNonNull(nifiVersion, "nifiVersion is null");
+
+ default:
+ throw new IllegalStateException(
+ "NiFi version can only be retrieved after a successful installation, not in: "
+ + currentState);
+ }
+ }
+
+ public void install() throws IOException {
+
+ currentState.checkCanTransition(State.INSTALLED);
+
+ File[] staleInstallations = placeholderNiFiHomeDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+ if (staleInstallations != null) {
+ Arrays.stream(staleInstallations).forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+ }
+
+ Path tempDirectory = null;
+ try {
+ tempDirectory = Files.createTempDirectory("installable-flow");
+
+
+
+ LOGGER.info("Uncompressing NiFi archive {} to {} ...", nifiBinaryZip, placeholderNiFiHomeDir);
+
+ Zip.unzipFile(nifiBinaryZip, placeholderNiFiHomeDir, new Zip.StatusListenerAdapter() {
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+ LOGGER.debug("Uncompressed {}", ze.getName());
+ }
+ });
+
+ LOGGER.info("Uncompressing DONE");
+
+ File actualNiFiHomeDir = getActualNiFiHomeDir(placeholderNiFiHomeDir);
+
+ nifiVersion = getNiFiVersion(actualNiFiHomeDir);
+
+ currentState = State.INSTALLED;
+
+ File installableFlowFile = createInstallableFlowFile(tempDirectory);
+
+ validateNiFiVersionAgainstFlowVersion(nifiVersion, installableFlowFile);
+
+ FileUtils.createSymlinks(placeholderNiFiHomeDir, actualNiFiHomeDir);
+
+ installFlowFile(installableFlowFile);
+ } catch (Exception e) {
+
+ currentState = State.INSTALLATION_FAILED;
+
+ throw new RuntimeException("Installation failed: " + e.getMessage(), e);
+
+ } finally {
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectoryRecursive(tempDirectory);
+ }
+ }
+
+ currentState = State.FLOW_INSTALLED;
+ }
+
+ private File createInstallableFlowFile(Path tempDirectory) throws IOException {
+
+ File flowXmlFile = new File(tempDirectory.toFile(), "flow.xml");
+
+ if (editCallback == null) {
+ Files.copy(flowXml.toPath(), flowXmlFile.toPath());
+ } else {
+ if (editCallback instanceof TestNiFiInstanceAware) {
+ ((TestNiFiInstanceAware)editCallback).setTestNiFiInstance(this);
+ }
+
+ XmlUtils.editXml(flowXml, flowXmlFile, editCallback);
+ }
+
+ return flowXmlFile;
+ }
+
+ private void installFlowFile(File fileToIncludeInGz) throws IOException {
+ Zip.gzipFile(fileToIncludeInGz, flowXmlGz);
+ }
+
+ private static String getNiFiVersion(File nifiInstallDir) {
+
+ File libDir = new File(nifiInstallDir, "lib");
+ if (!libDir.exists()) {
+ throw new IllegalStateException(
+ "No \"lib\" directory found in NiFi home directory: " + nifiInstallDir);
+ }
+
+ File[] nifiApiJarLookupResults =
+ libDir.listFiles((dir, name) -> name.startsWith("nifi-api-") && name.endsWith(".jar"));
+
+ if (nifiApiJarLookupResults == null) {
+ // since we check the existence before, this can only be null in case of an I/O error
+ throw new IllegalStateException(
+ "I/O error listing NiFi lib directory: " + libDir);
+ }
+
+ if (nifiApiJarLookupResults.length == 0) {
+ throw new IllegalStateException(
+ "No \"\"nifi-api-*.jar\" file found in NiFi lib directory: " + libDir);
+ }
+
+ if (nifiApiJarLookupResults.length != 1) {
+ throw new IllegalStateException(
+ "Multiple \"nifi-api-*.jar\" files found in NiFi lib directory: " + libDir);
+ }
+
+ File nifiApiJar = nifiApiJarLookupResults[0];
+
+
+ return nifiApiJar.getName()
+ .replace("nifi-api-", "")
+ .replace(".jar", "");
+ }
+
+ private static void validateNiFiVersionAgainstFlowVersion(String nifiVersion, File flowFile) {
+
+ String flowFileVersion = extractFlowFileVersion(flowFile);
+
+ if (flowFileVersion != null
+ && !flowFileVersion.equalsIgnoreCase(nifiVersion)) {
+
+ // prevent user errors and fail fast in case we detect that the flow file
+ // was created by a different version of NiFi. This can prevent a lot of confusion!
+
+ throw new RuntimeException(String.format(
+ "The NiFi version referenced in the flow file ('%s') does not match the version of NiFi being used ('%s')",
+ flowFileVersion, nifiVersion));
+ }
+ }
+
+ private static String extractFlowFileVersion(File flowFile) {
+
+ Document flowDocument = XmlUtils.getFileAsDocument(flowFile);
+
+ XPath xpath = XPathFactory.newInstance().newXPath();
+
+ try {
+ NodeList processorNodeVersion = (NodeList)
+ xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version/text()",
+ flowDocument, XPathConstants.NODESET);
+
+ HashSet<String> versionNumbers = new HashSet<>();
+
+ final int length = processorNodeVersion.getLength();
+ for (int i=0; i<length; i++) {
+ Node item = processorNodeVersion.item(i);
+
+ String textContent = item.getTextContent();
+
+ versionNumbers.add(textContent);
+ }
+
+ if (versionNumbers.size() == 0) {
+ return null;
+ }
+
+ if (versionNumbers.size() > 1) {
+ throw new RuntimeException(
+ "Multiple NiFi versions found in Flow file, this is unexpected: " + versionNumbers);
+ }
+
+ return versionNumbers.iterator().next();
+
+ } catch (XPathExpressionException e) {
+ throw new RuntimeException("Failure extracting version information from flow file: " + flowFile, e);
+ }
+ }
+
+
+ public void start() {
+
+ currentState.checkCanTransition(State.STARTED);
+
+ try {
+ if (!bootstrapLibDir.exists()) {
+ throw new IllegalStateException("Not found: " + bootstrapLibDir);
+ }
+
+
+
+ System.setProperty("org.apache.jasper.compiler.disablejsr199", "true");
+ System.setProperty("java.security.egd", "file:/dev/urandom");
+ System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ System.setProperty("java.awt.headless", "true");
+ System.setProperty("java.protocol.handler.pkgs", "sun.net.www.protocol");
+
+ System.setProperty("nifi.properties.file.path", nifiProperties.getAbsolutePath());
+ System.setProperty("app", "NiFi");
+ System.setProperty("org.apache.nifi.bootstrap.config.log.dir", "./logs");
+
+ ClassLoader coreClassLoader = new NiFiCoreLibClassLoader(nifiHomeDir, ClassLoader.getSystemClassLoader());
+ Thread.currentThread().setContextClassLoader(coreClassLoader);
+
+
+
+ this.testNiFi = new EmbeddedNiFi(new String[0], coreClassLoader);
+
+ } catch (Exception ex) {
+
+ currentState = State.START_FAILED;
+
+ throw new RuntimeException("Startup failed", ex);
+
+ }
+
+ currentState = State.STARTED;
+
+
+ }
+
+
+ public void stopAndCleanup() {
+ currentState.checkCanTransition(State.STOPPED);
+
+ try {
+ testNiFi.shutdown();
+
+ removeNiFiFilesCreatedForTemporaryInstallation(placeholderNiFiHomeDir);
+
+ } catch (Exception e) {
+ currentState = State.STOP_FAILED;
+
+ throw new RuntimeException(e);
+ }
+
+ currentState = State.STOPPED;
+ }
+
+ private static File requireCurrentWorkingDirectoryIsCorrect() {
+
+ File currentWorkDir = new File(System.getProperty("user.dir"));
+ if (!currentWorkDir.getName().equals("nifi_testharness_nifi_home")) {
+
+ throw new IllegalStateException(
+ "The test's working directory has to be set to nifi_testharness_nifi_home, but was: " + currentWorkDir);
+ }
+ return currentWorkDir;
+ }
+
+ private static File getActualNiFiHomeDir(File currentDir) {
+ File[] files = currentDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+
+ if (files == null || files.length == 0) {
+ throw new IllegalStateException(
+ "No \"nifi-*\" directory found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ if (files.length != 1) {
+ throw new IllegalStateException(
+ "Multiple \"nifi-*\" directories found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ return files[0];
+ }
+
+ private static void removeNiFiFilesCreatedForTemporaryInstallation(File directoryToClear) {
+
+ if (directoryToClear != null) {
+ File[] directoryContents = directoryToClear.listFiles();
+ if (directoryContents != null) {
+ Arrays.stream(directoryContents)
+ .filter(file -> !"NIFI_TESTHARNESS_README.txt".equals(file.getName()))
+ .forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+ }
+ }
+ }
+
+ private static void deleteFileOrDirectoryRecursively(File file) {
+ if (file.isDirectory()) {
+ FileUtils.deleteDirectoryRecursive(file);
+ } else {
+ boolean deletedSuccessfully = file.delete();
+ if (!deletedSuccessfully) {
+ throw new RuntimeException("Could not delete: " + file);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "NiFi test instance(" + Integer.toHexString(hashCode())
+ + ") state: " + currentState + ", home: " + nifiHomeDir;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+
+ public static class Builder {
+
+ private boolean isDisposed = false;
+
+ private File nifiBinaryZip;
+ private File flowXml;
+ private FlowFileEditorCallback editCallback;
+
+ /**
+ * Sets the location of the NiFi binary distribution file, from which the test instance
+ * will be uncompressed and built.
+ *
+ * @param nifiBinaryZip
+ * the NiFi binary distribution file, from which the test instance will be built (never {@code null})
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setNiFiBinaryDistributionZip(File nifiBinaryZip) {
+ if (!nifiBinaryZip.exists()) {
+ throw new IllegalArgumentException("File not found: " + nifiBinaryZip);
+ }
+
+ if (nifiBinaryZip.isDirectory()) {
+ throw new IllegalArgumentException("A ZIP file is expected to be specified, not a directory: "
+ + nifiBinaryZip);
+ }
+
+ this.nifiBinaryZip = nifiBinaryZip;
+ return this;
+ }
+
+ /**
+ * Sets the NiFi flow XML, which will be installed to the NiFi instance for testing.
+ *
+ * @param flowXml the NiFi flow file to install to the test instance for testing (never {@code null})
+ *
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setFlowXmlToInstallForTesting(File flowXml) {
+ if (!flowXml.exists()) {
+ throw new IllegalArgumentException("File not found: " + flowXml);
+ }
+
+ this.flowXml = flowXml;
+ return this;
+ }
+
+ /**
+ * <p>
+ * An <strong>optional</strong> callback to change the flow definition read from
+ * {@link #setFlowXmlToInstallForTesting(File)}, before it is actually installed for testing.
+ * (NOTE: The original file remains unchanged: changes are applied to a copy of it.)</p>
+ *
+ * <p>
+ * NOTE: {@link SimpleNiFiFlowDefinitionEditor} provides various common flow definition changes
+ * useful for testing.
+ * </p>
+ *
+ * @param callback an <strong>optional</strong> callback to change the flow definition
+ *
+ * @return {@code this} (for method chaining)
+ *
+ * @see SimpleNiFiFlowDefinitionEditor
+ */
+ public Builder modifyFlowXmlBeforeInstalling(FlowFileEditorCallback callback) {
+ this.editCallback = callback;
+ return this;
+ }
+
+
+
+ public TestNiFiInstance build() {
+ if (isDisposed) {
+ throw new IllegalStateException("builder can only be used once");
+ }
+ isDisposed = true;
+
+ return new TestNiFiInstance(nifiBinaryZip, flowXml, editCallback);
+ }
+
+
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java
new file mode 100644
index 0000000..31889ed
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness;
+
+public interface TestNiFiInstanceAware {
+ void setTestNiFiInstance(TestNiFiInstance testNiFiInstance);
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
new file mode 100644
index 0000000..dabf361
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.api;
+
+import org.w3c.dom.Document;
+
+/**
+ * <p>
+ * An interface that allows programmatic access to the contents of a NiFi Flow XML,
+ * allowing changes to be performed before it
+ * is actually installed to the NiFi instance.</p>
+ *
+ * <p>
+ * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ * </p>
+ *
+ */
+public interface FlowFileEditorCallback {
+
+ /**
+ *
+ * @param document the document to change (never {@code null})
+ * @return the changed document (never {@code null})
+ * @throws Exception in case the editing fails
+ */
+ Document edit(Document document) throws Exception;
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
new file mode 100644
index 0000000..e207f08
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+
+public final class FileUtils {
+
+
+ private static final String MAC_DS_STORE_NAME = ".DS_Store";
+
+ private FileUtils() {
+ // no instances
+ }
+
+ public static void deleteDirectoryRecursive(Path directory) throws IOException {
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ public static void deleteDirectoryRecursive(File dir) {
+ try {
+ deleteDirectoryRecursive(dir.toPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createLink(Path newLink, Path existingFile) {
+ try {
+ Files.createSymbolicLink(newLink, existingFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createSymlinks(File newLinkDir, File existingDir) {
+ Arrays.stream(existingDir.list())
+ .filter(fileName -> !MAC_DS_STORE_NAME.equals(fileName))
+ .forEach(fileName -> {
+ Path newLink = Paths.get(newLinkDir.getAbsolutePath(), fileName);
+ Path existingFile = Paths.get(existingDir.getAbsolutePath(), fileName);
+
+ File symlinkFile = newLink.toFile();
+ if (symlinkFile.exists()) {
+ symlinkFile.delete();
+ }
+
+ createLink(newLink, existingFile);
+ });
+ }
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
new file mode 100644
index 0000000..a3af363
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class NiFiCoreLibClassLoader extends URLClassLoader {
+
+
+ public NiFiCoreLibClassLoader(File nifiHomeDir, ClassLoader parent) {
+ super(getURls(nifiHomeDir), parent);
+ }
+
+ private static URL[] getURls(File nifiHomeDir) {
+
+ try {
+ File libDir = new File(nifiHomeDir, "lib");
+ File bootstrapLibDir = new File(libDir, "bootstrap");
+
+
+ List<URL> libs = Files.list(libDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+ List<URL> bootstrapLibs = Files.list(bootstrapLibDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+
+ LinkedList<URL> urls = new LinkedList<>();
+ urls.addAll(libs);
+ urls.addAll(bootstrapLibs);
+
+ return urls.toArray(new URL[urls.size()]);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+
+
+ }
+
+ private static URL toURL(Path path) {
+ try {
+ return path.toUri().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private static boolean isJarOrNarFile(Path path) {
+ String fullPathString = path.getFileName().toString();
+
+ return path.toFile().isFile() && fullPathString.endsWith(".jar");
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
new file mode 100644
index 0000000..3e225e7
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.File;
+import java.io.FileInputStream;
+
+public final class XmlUtils {
+
+ public static void editXml(File inputFile, File outputFile, FlowFileEditorCallback editCallback) {
+
+ try {
+ Document document = getFileAsDocument(inputFile);
+
+ document = editCallback.edit(document);
+
+ // save the result
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.transform(new DOMSource(document), new StreamResult(outputFile));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to change XML document: " + e.getMessage(), e);
+ }
+ }
+
+ public static Document getFileAsDocument(File xmlFile) {
+ try(FileInputStream inputStream = new FileInputStream(xmlFile)) {
+
+ DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();
+
+ return documentBuilder.parse(new InputSource(inputStream));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse XML file: " + xmlFile, e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
new file mode 100644
index 0000000..12ea403
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public final class Zip {
+
+ private Zip() {
+ // no external instances allowed
+ }
+
+
+ public interface StatusListener {
+ void onUncompressStarted(ZipEntry ze);
+
+ void onUncompressDone(ZipEntry ze);
+ }
+
+ public static class StatusListenerAdapter implements StatusListener {
+
+ @Override
+ public void onUncompressStarted(ZipEntry ze) {
+
+ }
+
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+
+ }
+ }
+
+ private static final StatusListener NO_OP_STATUS_LISTENER = new StatusListenerAdapter();
+
+ public static void unzipFile(File zipFile, File targetDirectory) throws IOException {
+ unzipFile(zipFile, targetDirectory, NO_OP_STATUS_LISTENER);
+
+ }
+
+
+ public static void unzipFile(File zipFile, File targetDirectory,
+ StatusListener statusListener) throws IOException {
+
+ if (!targetDirectory.exists()) {
+ boolean mkdirs = targetDirectory.mkdirs();
+ if (!mkdirs) {
+ throw new IOException("Failed to create directory: " + targetDirectory);
+ }
+ }
+
+ try (ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile))) {
+
+ ZipEntry ze = zipInputStream.getNextEntry();
+
+ while (ze != null) {
+
+ if(ze.isDirectory()) {
+ ze = zipInputStream.getNextEntry();
+ continue;
+ }
+
+ statusListener.onUncompressStarted(ze);
+
+ String fileName = ze.getName();
+ File outputFile = new File(targetDirectory, fileName);
+
+
+ File parentDir = new File(outputFile.getParent());
+ if (!parentDir.exists()) {
+ boolean couldCreateParentDir = parentDir.mkdirs();
+ if (!couldCreateParentDir) {
+ throw new IllegalStateException("Could not create: " + parentDir);
+
+ }
+ }
+
+
+
+ Files.copy(zipInputStream, outputFile.toPath());
+
+ statusListener.onUncompressDone(ze);
+
+
+ ze = zipInputStream.getNextEntry();
+ }
+
+ zipInputStream.closeEntry();
+
+
+ }
+
+
+ }
+
+
+ public static void gzipFile(File inputFile, File gzipFile) throws IOException {
+
+ try (GZIPOutputStream gzos =
+ new GZIPOutputStream(new FileOutputStream(gzipFile))) {
+
+
+ Files.copy(inputFile.toPath(), gzos);
+
+ gzos.finish();
+ }
+ }
+
+
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
new file mode 100644
index 0000000..83653f7
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+public final class Constants {
+
+ static final File OUTPUT_DIR = new File("./NiFiTest/NiFiReadTest");
+
+ // NOTE: you will have to have the NiFi distribution ZIP placed into this directory.
+ // Its version must be the same as the one referenced in the flow.xml, otherwise it will not work!
+ static final File NIFI_ZIP_DIR = new File("../../nifi-assembly/target");
+
+ static final File FLOW_XML_FILE = new File(NiFiMockFlowTest.class.getResource("/flow.xml").getFile());
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
new file mode 100644
index 0000000..9e13db7
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by starting a mock HTTP server (using Jetty)
+ * and rewriting the URL in flow definition.
+ */
+public class NiFiFlowTest {
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
+ .setSingleProcessorProperty("GetHTTP", "URL", "http://localhost:12345")
+ .build();
+
+ // used by mocked GetHTTP; serves test data
+ private static Server testJettyServer;
+
+ private TestNiFiInstance testNiFiInstance;
+
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ NiFiFlowTest.testJettyServer = new Server(12345);
+
+
+ Handler handler = new TestHandler();
+ NiFiFlowTest.testJettyServer.setHandler(handler);
+ NiFiFlowTest.testJettyServer.start();
+ }
+
+
+ @Before
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ TestNiFiInstance testNiFi = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+
+ testNiFi.install();
+ testNiFi.start();
+
+ // only assign testNiFi to the field in case it was started successfully
+ testNiFiInstance = testNiFi;
+ }
+
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue("Output file not found: " + outputFile, outputFile.exists());
+
+ List<String> strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
+
+ assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
+
+ boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
+
+ assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
+
+ }
+
+ @After
+ public void shutdownNiFi() {
+
+ if (testNiFiInstance != null) {
+ testNiFiInstance.stopAndCleanup();
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ NiFiFlowTest.testJettyServer.stop();
+ }
+
+
+ private static class TestHandler extends org.eclipse.jetty.server.handler.AbstractHandler {
+ @Override
+ public void handle(
+ String target,
+ Request baseRequest,
+ HttpServletRequest httpServletRequest,
+ HttpServletResponse response) throws IOException, ServletException {
+
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+
+ InputStream resource = TestHandler.class.getResourceAsStream("/sample_technology_rss.xml");
+ ServletOutputStream outputStream = response.getOutputStream();
+
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = resource.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, len);
+ }
+ }
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
new file mode 100644
index 0000000..c5a7139
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.samples.mock.GetHTTPMock;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by mocking the processor
+ * itself in the flow definition.
+ */
+public class NiFiMockFlowTest {
+
+ private static final InputStream DEMO_DATA_AS_STREAM =
+ NiFiMockFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
+
+
+ // We have a dedicated class. It has to be public static
+ // so that NiFi engine can instantiate it.
+ public static class MockedGetHTTP extends GetHTTPMock {
+
+ public MockedGetHTTP() {
+ super("text/xml; charset=utf-8", () -> DEMO_DATA_AS_STREAM);
+ }
+ }
+
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
+ .setClassOfSingleProcessor("GetHTTP", MockedGetHTTP.class)
+ .build();
+
+
+ private TestNiFiInstance testNiFiInstance;
+
+ @Before
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ TestNiFiInstance testNiFi = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+
+ testNiFi.install();
+ testNiFi.start();
+
+ // only assign testNiFi to the field in case it was started successfully
+ testNiFiInstance = testNiFi;
+ }
+
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue("Output file not found: " + outputFile, outputFile.exists());
+
+ List<String> strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
+
+ assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
+
+ boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
+
+ assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
+
+ }
+
+ @After
+ public void shutdownNiFi() {
+
+ if (testNiFiInstance != null) {
+ testNiFiInstance.stopAndCleanup();
+ }
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
new file mode 100644
index 0000000..7d0b633
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+final class TestUtils {
+
+ private TestUtils() {
+ // no instances allowed
+ }
+
+ static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
+
+ if (!binaryDistributionZipDir.exists()) {
+ throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: "
+ + binaryDistributionZipDir.getAbsolutePath());
+ }
+
+ File[] files = binaryDistributionZipDir.listFiles((dir, name) ->
+ name.startsWith("nifi-") && name.endsWith("-bin.zip"));
+
+ if (files == null) {
+ throw new IllegalStateException(
+ "Not a directory or I/O error reading: " + binaryDistributionZipDir.getAbsolutePath());
+ }
+
+ if (files.length == 0) {
+ throw new IllegalStateException(
+ "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir.getAbsolutePath());
+ }
+
+ if (files.length > 1) {
+ throw new IllegalStateException(
+ "Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir.getAbsolutePath());
+ }
+
+ return files[0];
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
new file mode 100644
index 0000000..67055e4
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples.mock;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class GetHTTPMock extends MockProcessor {
+
+ private final String contentType;
+ private final Supplier<InputStream> inputStreamSupplier;
+
+ public GetHTTPMock(String contentType, Supplier<InputStream> inputStreamSupplier) {
+ super("org.apache.nifi.processors.standard.GetHTTP");
+
+ this.contentType = contentType;
+ this.inputStreamSupplier = inputStreamSupplier;
+ }
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All files are transferred to the success relationship")
+ .build();
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory processSessionFactory) {
+
+ final ComponentLog logger = getLogger();
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ final ProcessSession session = processSessionFactory.createSession();
+
+ final String url = context.getProperty("URL").evaluateAttributeExpressions().getValue();
+ final URI uri;
+ String source = url;
+ try {
+ uri = new URI(url);
+ source = uri.getHost();
+ } catch (final URISyntaxException swallow) {
+ // this won't happen as the url has already been validated
+ }
+
+ FlowFile flowFile = session.create();
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty("Filename").evaluateAttributeExpressions().getValue());
+ flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
+ flowFile = session.importFrom(inputStreamSupplier.get(), flowFile);
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), contentType);
+
+ final long flowFileSize = flowFile.getSize();
+ stopWatch.stop();
+ session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String dataRate = stopWatch.calculateDataRate(flowFileSize);
+ logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
+ session.commit();
+
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
new file mode 100644
index 0000000..cd62b2d
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.testharness.samples.mock;
+
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public abstract class MockProcessor implements Processor {
+
+ private final Processor delegate;
+ private ComponentLog logger;
+
+ protected MockProcessor(String delegateClassName) {
+ try {
+
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ final Class<?> delegateClass = Class.forName(delegateClassName, true, contextClassLoader);
+
+ delegate = (Processor) delegateClass.newInstance();
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ }
+
+ protected Processor getDelegate() {
+ return delegate;
+
+ }
+
+ protected final ComponentLog getLogger() {
+ return logger;
+ }
+
+ @Override
+ public void initialize(ProcessorInitializationContext processorInitializationContext) {
+ getDelegate().initialize(processorInitializationContext);
+ logger = processorInitializationContext.getLogger();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return getDelegate().getRelationships();
+ }
+
+ @Override
+ public abstract void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory);
+
+ @Override
+ public Collection<ValidationResult> validate(ValidationContext validationContext) {
+ return getDelegate().validate(validationContext);
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(String s) {
+ return getDelegate().getPropertyDescriptor(s);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
+ getDelegate().onPropertyModified(propertyDescriptor, s, s1);
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return getDelegate().getPropertyDescriptors();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getDelegate().getIdentifier();
+ }
+}
diff --git a/nifi-testharness/src/test/resources/flow.xml b/nifi-testharness/src/test/resources/flow.xml
new file mode 100644
index 0000000..66a1cf2
--- /dev/null
+++ b/nifi-testharness/src/test/resources/flow.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController encoding-version="1.3">
+ <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+ <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+ <registries/>
+ <rootGroup>
+ <id>92b74849-0166-1000-28d3-4da912e34551</id>
+ <name>NiFi Flow</name>
+ <position x="0.0" y="0.0"/>
+ <comment/>
+ <processor>
+ <id>92b9139c-0166-1000-04d5-1184adc0977a</id>
+ <name>PutFile</name>
+ <position x="632.0" y="98.0"/>
+ <styles/>
+ <comment/>
+ <class>org.apache.nifi.processors.standard.PutFile</class>
+ <bundle>
+ <group>org.apache.nifi</group>
+ <artifact>nifi-standard-nar</artifact>
+ <version>1.7.1</version>
+ </bundle>
+ <maxConcurrentTasks>1</maxConcurrentTasks>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <penalizationPeriod>30 sec</penalizationPeriod>
+ <yieldPeriod>1 sec</yieldPeriod>
+ <bulletinLevel>WARN</bulletinLevel>
+ <lossTolerant>false</lossTolerant>
+ <scheduledState>RUNNING</scheduledState>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <executionNode>ALL</executionNode>
+ <runDurationNanos>0</runDurationNanos>
+ <property>
+ <name>Directory</name>
+ <value>./NiFiTest/NiFiReadTest</value>
+ </property>
+ <property>
+ <name>Conflict Resolution Strategy</name>
+ <value>ignore</value>
+ </property>
+ <property>
+ <name>Create Missing Directories</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>Maximum File Count</name>
+ </property>
+ <property>
+ <name>Last Modified Time</name>
+ </property>
+ <property>
+ <name>Permissions</name>
+ </property>
+ <property>
+ <name>Owner</name>
+ </property>
+ <property>
+ <name>Group</name>
+ </property>
+ <autoTerminatedRelationship>success</autoTerminatedRelationship>
+ <autoTerminatedRelationship>failure</autoTerminatedRelationship>
+ </processor>
+ <processor>
+ <id>92b87553-0166-1000-527e-7ecdc888d91a</id>
+ <name>GetHTTP</name>
+ <position x="238.0" y="98.0"/>
+ <styles/>
+ <comment/>
+ <class>org.apache.nifi.processors.standard.GetHTTP</class>
+ <bundle>
+ <group>org.apache.nifi</group>
+ <artifact>nifi-standard-nar</artifact>
+ <version>1.7.1</version>
+ </bundle>
+ <maxConcurrentTasks>1</maxConcurrentTasks>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <penalizationPeriod>30 sec</penalizationPeriod>
+ <yieldPeriod>1 sec</yieldPeriod>
+ <bulletinLevel>WARN</bulletinLevel>
+ <lossTolerant>false</lossTolerant>
+ <scheduledState>RUNNING</scheduledState>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <executionNode>ALL</executionNode>
+ <runDurationNanos>0</runDurationNanos>
+ <property>
+ <name>URL</name>
+ <value>http://feeds.bbci.co.uk/news/technology/rss.xml?edition=uk#</value>
+ </property>
+ <property>
+ <name>Filename</name>
+ <value>bbc-world.rss.xml</value>
+ </property>
+ <property>
+ <name>SSL Context Service</name>
+ </property>
+ <property>
+ <name>Username</name>
+ </property>
+ <property>
+ <name>Password</name>
+ </property>
+ <property>
+ <name>Connection Timeout</name>
+ <value>30 sec</value>
+ </property>
+ <property>
+ <name>Data Timeout</name>
+ <value>30 sec</value>
+ </property>
+ <property>
+ <name>User Agent</name>
+ </property>
+ <property>
+ <name>Accept Content-Type</name>
+ </property>
+ <property>
+ <name>Follow Redirects</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>redirect-cookie-policy</name>
+ <value>default</value>
+ </property>
+ <property>
+ <name>proxy-configuration-service</name>
+ </property>
+ <property>
+ <name>Proxy Host</name>
+ </property>
+ <property>
+ <name>Proxy Port</name>
+ </property>
+ </processor>
+ <connection>
+ <id>92b9380b-0166-1000-981d-c9e319f135e3</id>
+ <name/>
+ <bendPoints/>
+ <labelIndex>1</labelIndex>
+ <zIndex>0</zIndex>
+ <sourceId>92b87553-0166-1000-527e-7ecdc888d91a</sourceId>
+ <sourceGroupId>92b74849-0166-1000-28d3-4da912e34551</sourceGroupId>
+ <sourceType>PROCESSOR</sourceType>
+ <destinationId>92b9139c-0166-1000-04d5-1184adc0977a</destinationId>
+ <destinationGroupId>92b74849-0166-1000-28d3-4da912e34551</destinationGroupId>
+ <destinationType>PROCESSOR</destinationType>
+ <relationship>success</relationship>
+ <maxWorkQueueSize>10000</maxWorkQueueSize>
+ <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
+ <flowFileExpiration>0 sec</flowFileExpiration>
+ </connection>
+ </rootGroup>
+ <controllerServices/>
+ <reportingTasks/>
+</flowController>
diff --git a/nifi-testharness/src/test/resources/logback-test.xml b/nifi-testharness/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..ab903af
--- /dev/null
+++ b/nifi-testharness/src/test/resources/logback-test.xml
@@ -0,0 +1,15 @@
+<configuration debug="true">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are by default assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
\ No newline at end of file
diff --git a/nifi-testharness/src/test/resources/sample_technology_rss.xml b/nifi-testharness/src/test/resources/sample_technology_rss.xml
new file mode 100644
index 0000000..a95ba96
--- /dev/null
+++ b/nifi-testharness/src/test/resources/sample_technology_rss.xml
@@ -0,0 +1,24 @@
+
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet title="XSL_formatting" type="text/xsl" href="/shared/bsp/xsl/rss/nolsol.xsl"?>
+<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:media="http://search.yahoo.com/mrss/">
+ <channel>
+ <title>Sample Technology feed</title>
+ <description>Sample Technology feed</description>
+ <image>
+ <url>https://nifi.apache.org/assets/images/apache-nifi-logo.svg</url>
+ <title>NiFi sample</title>
+ <link>https://nifi.apache.org/</link>
+ </image>
+ <language>en-gb</language>
+ <ttl>15</ttl>
+ <item>
+ <title>The latest NiFi version is out</title>
+ <description>The latest version of NiFi is released</description>
+ <link>https://nifi.apache.org/</link>
+ <guid isPermaLink="true">https://nifi.apache.org/</guid>
+ <pubDate>Sat, 24 Sep 2018 17:10:10 GMT</pubDate>
+ <media:thumbnail width="1024" height="576" url="https://nifi.apache.org/assets/images/apache-nifi-logo.svg"/>
+ </item>
+ </channel>
+</rss>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b6cf513..8037976 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
<module>nifi-mock</module>
<module>nifi-nar-bundles</module>
<module>nifi-assembly</module>
+ <module>nifi-testharness</module>
<module>nifi-docs</module>
<module>nifi-maven-archetypes</module>
<module>nifi-external</module>