You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/03/18 03:08:04 UTC

[nifi] branch support/nifi-1.13 updated (6d8c3e9 -> d610b1c)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git.


 discard 6d8c3e9  NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream.
     new d610b1c  NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6d8c3e9)
            \
             N -- N -- N   refs/heads/support/nifi-1.13 (d610b1c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi/tests/system/processor/RunOnceIT.java     | 50 ----------------------
 1 file changed, 50 deletions(-)
 delete mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java


[nifi] 01/01: NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit d610b1ccea4ec4282d10aa9d3f65ed6f3d263f95
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Mar 17 13:30:38 2021 -0400

    NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../repository/StandardProcessSession.java         |   4 +-
 .../processors/tests/system/ReverseContents.java   |   2 +
 .../nifi/processors/tests/system/SplitByLine.java  | 126 +++++++++++++++++
 .../processors/tests/system/VerifyContents.java    | 102 ++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  23 ++++
 .../tests/system/repositories/ContentAccessIT.java | 150 +++++++++++++++++++++
 7 files changed, 407 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 1d24f33..b2b1bde 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2300,9 +2300,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
 
                 currentReadClaim = claim.getResourceClaim();
                 final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
-                StreamUtils.skip(contentRepoStream, claim.getOffset());
+                StreamUtils.skip(contentRepoStream, claim.getOffset() + contentClaimOffset);
                 final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
-                final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset());
+                final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset);
                 currentReadClaimStream = byteCountingInputStream;
 
                 // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
index cfb1124..62c8dc2 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.tests.system;
 
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -31,6 +32,7 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Set;
 
+@SupportsBatching
 public class ReverseContents extends AbstractProcessor {
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
new file mode 100644
index 0000000..93b8e14
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.util.TextLineDemarcator;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class SplitByLine extends AbstractProcessor {
+
+    static final PropertyDescriptor USE_CLONE = new PropertyDescriptor.Builder()
+        .name("Use Clone")
+        .description("Whether or not to use session.clone for generating children FlowFiles")
+        .required(true)
+        .defaultValue("true")
+        .allowableValues("true", "false")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.singletonList(USE_CLONE);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final boolean clone = context.getProperty(USE_CLONE).asBoolean();
+        if (clone) {
+            splitByClone(session, flowFile);
+        } else {
+            splitByWrite(session, flowFile);
+        }
+
+        session.remove(flowFile);
+    }
+
+    private void splitByClone(ProcessSession session, FlowFile flowFile) {
+        final List<TextLineDemarcator.OffsetInfo> offsetInfos = new ArrayList<>();
+
+        try (final InputStream in = session.read(flowFile);
+             final TextLineDemarcator demarcator = new TextLineDemarcator(in)) {
+
+            TextLineDemarcator.OffsetInfo offsetInfo;
+            while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
+                offsetInfos.add(offsetInfo);
+            }
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (final TextLineDemarcator.OffsetInfo offsetInfo : offsetInfos) {
+            FlowFile child = session.clone(flowFile, offsetInfo.getStartOffset(), offsetInfo.getLength() - offsetInfo.getCrlfLength());
+            session.putAttribute(child, "num.lines", String.valueOf(offsetInfos.size()));
+            session.transfer(child, REL_SUCCESS);
+        }
+    }
+
+    private void splitByWrite(ProcessSession session, FlowFile flowFile) {
+        final List<FlowFile> children = new ArrayList<>();
+        try (final InputStream in = session.read(flowFile);
+             final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+
+            String line;
+            while ((line = reader.readLine()) != null) {
+                FlowFile child = session.create(flowFile);
+                children.add(child);
+
+                try (final OutputStream out = session.write(child)) {
+                    final byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8);
+                    out.write(lineBytes);
+                }
+            }
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (FlowFile child : children) {
+            session.putAttribute(child, "num.lines", String.valueOf(children.size()));
+        }
+
+        session.transfer(children, REL_SUCCESS);
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
new file mode 100644
index 0000000..9454e96
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class VerifyContents extends AbstractProcessor {
+    private static final Relationship REL_UNMATCHED = new Relationship.Builder()
+        .name("unmatched")
+        .build();
+
+    private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(Collections.singleton(REL_UNMATCHED));
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .dynamic(true)
+            .addValidator(Validator.VALID)
+            .build();
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        final Relationship relationship = new Relationship.Builder()
+            .name(descriptor.getName())
+            .build();
+
+        final Set<Relationship> updatedRelationships = new HashSet<>(relationshipsRef.get());
+
+        if (newValue == null) {
+            updatedRelationships.remove(relationship);
+        } else {
+            updatedRelationships.add(relationship);
+        }
+
+        updatedRelationships.add(REL_UNMATCHED); // Ensure that the unmatched relationship is always available
+        relationshipsRef.set(updatedRelationships);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String contents;
+        try (final InputStream in = session.read(flowFile);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+            StreamUtils.copy(in, baos);
+            contents = new String(baos.toByteArray(), StandardCharsets.UTF_8);
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            final String propertyName = entry.getKey().getName();
+            if (contents.equals(entry.getValue())) {
+                getLogger().info("Routing {} to {}", flowFile, propertyName);
+                session.transfer(flowFile, new Relationship.Builder().name(propertyName).build());
+                return;
+            }
+        }
+
+        getLogger().info("Routing {} to {}", flowFile, REL_UNMATCHED);
+        session.transfer(flowFile, REL_UNMATCHED);
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 169a9c6..6f8e373 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -24,7 +24,9 @@ org.apache.nifi.processors.tests.system.GenerateFlowFile
 org.apache.nifi.processors.tests.system.ReverseContents
 org.apache.nifi.processors.tests.system.SetAttribute
 org.apache.nifi.processors.tests.system.Sleep
+org.apache.nifi.processors.tests.system.SplitByLine
 org.apache.nifi.processors.tests.system.TerminateFlowFile
 org.apache.nifi.processors.tests.system.ThrowProcessException
 org.apache.nifi.processors.tests.system.ValidateFileExists
+org.apache.nifi.processors.tests.system.VerifyContents
 org.apache.nifi.processors.tests.system.WriteToFile
\ No newline at end of file
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7616a29..feb7bda 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@@ -82,8 +83,10 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -261,6 +264,12 @@ public class NiFiClientUtil {
         return updateProcessorConfig(currentEntity, config);
     }
 
+    public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity currentEntity, final int runDuration) throws NiFiClientException, IOException {
+        final ProcessorConfigDTO config = new ProcessorConfigDTO();
+        config.setRunDurationMillis((long) runDuration);
+        return updateProcessorConfig(currentEntity, config);
+    }
+
     public ProcessorEntity updateProcessorSchedulingPeriod(final ProcessorEntity currentEntity, final String schedulingPeriod) throws NiFiClientException, IOException {
         final ProcessorConfigDTO config = new ProcessorConfigDTO();
         config.setSchedulingPeriod(schedulingPeriod);
@@ -776,6 +785,20 @@ public class NiFiClientUtil {
         return flowFileEntity;
     }
 
+    public String getFlowFileContentAsUtf8(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
+        final byte[] contents = getFlowFileContentAsByteArray(connectionId, flowFileIndex);
+        return new String(contents, StandardCharsets.UTF_8);
+    }
+
+    public byte[] getFlowFileContentAsByteArray(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
+        try (final InputStream in = getFlowFileContent(connectionId, flowFileIndex);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+            StreamUtils.copy(in, baos);
+            return baos.toByteArray();
+        }
+    }
+
     public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
         final ListingRequestEntity listing = performQueueListing(connectionId);
         final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
new file mode 100644
index 0000000..1a3790f
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test is intended to verify that Processors are able to access the content that their FlowFiles represent in several different situations.
+ * We test for things like splitting a FlowFile by creating multiple children and writing to them, as well as creating children via processSession.clone(FlowFile flowFile, long offset, long length);
+ * We also test against Run Duration of 0 ms vs. 25 milliseconds in order to test when a processor writes the contents to multiple FlowFiles in the same session (which will result in writing to the
+ * same Content Claim) as well as writing to multiple FlowFiles in multiple sessions (which may result in writing to multiple Content Claims).
+ */
+public class ContentAccessIT extends NiFiSystemIT {
+
+    @Test
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndWrite() throws NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(true, false);
+    }
+
+    @Test
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndWrite() throws NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(false, false);
+    }
+
+    @Test
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndClone() throws NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(true, true);
+    }
+
+    @Test
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndClone() throws NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(false, true);
+    }
+
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaim(final boolean useBatch, final boolean clone) throws NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity split = getClientUtil().createProcessor("SplitByLine");
+        final ProcessorEntity reverse = getClientUtil().createProcessor("ReverseContents");
+        final ProcessorEntity verify = getClientUtil().createProcessor("VerifyContents");
+        final ProcessorEntity terminateAa = getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateBa = getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateCa = getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateUnmatched = getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Configure Generate
+        getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
+        getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Text", "{ a : a }\n{ a : b }\n{ a : c }"));
+
+        // Configure split
+        getClientUtil().updateProcessorProperties(split, Collections.singletonMap("Use Clone", String.valueOf(clone)));
+
+        // Configure Verify
+        final Map<String, String> verifyProperties = new HashMap<>();
+        verifyProperties.put("aa", "} a : a {");
+        verifyProperties.put("ba", "} b : a {");
+        verifyProperties.put("ca", "} c : a {");
+        getClientUtil().updateProcessorProperties(verify, verifyProperties);
+
+        // Configure batching for reverse
+        final int runDuration = useBatch ? 25 : 0;
+        getClientUtil().updateProcessorRunDuration(reverse, runDuration);
+
+        final ConnectionEntity generateToSplit = getClientUtil().createConnection(generate, split, "success");
+        final ConnectionEntity splitToReverse = getClientUtil().createConnection(split, reverse, "success");
+        final ConnectionEntity reverseToVerify = getClientUtil().createConnection(reverse, verify, "success");
+        final ConnectionEntity verifyToTerminateAa = getClientUtil().createConnection(verify, terminateAa, "aa");
+        final ConnectionEntity verifyToTerminateBa = getClientUtil().createConnection(verify, terminateBa, "ba");
+        final ConnectionEntity verifyToTerminateCa = getClientUtil().createConnection(verify, terminateCa, "ca");
+        final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched");
+
+        // Run Generate processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(generate);
+        waitForQueueCount(generateToSplit.getId(), 1);
+
+        // Run split processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(split);
+        waitForQueueCount(splitToReverse.getId(), 3);
+
+        // Verify output of the Split processor
+        final String firstSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 0);
+        final String secondSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 1);
+        final String thirdSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 2);
+
+        // Verify that we get both expected outputs. We put them in a set and ensure that the set contains both because we don't know the order
+        // that they will be in. The reason we don't know the order is because if we are using batching, the contents will be in the same output
+        // Content Claim, otherwise they won't be. If they are not, the order can change.
+        final Set<String> splitContents = new HashSet<>();
+        splitContents.add(firstSplitContents);
+        splitContents.add(secondSplitContents);
+        splitContents.add(thirdSplitContents);
+
+        assertTrue(splitContents.contains("{ a : a }"));
+        assertTrue(splitContents.contains("{ a : b }"));
+        assertTrue(splitContents.contains("{ a : c }"));
+
+        // Start the reverse processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(reverse);
+        waitForQueueCount(reverseToVerify.getId(), 3);
+
+        final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
+        final String secondReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 1);
+        final String thirdReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 2);
+
+        final Set<String> reversedContents = new HashSet<>();
+        reversedContents.add(firstReversedContents);
+        reversedContents.add(secondReversedContents);
+        reversedContents.add(thirdReversedContents);
+
+        assertTrue(reversedContents.contains("} a : a {"));
+        assertTrue(reversedContents.contains("} b : a {"));
+        assertTrue(reversedContents.contains("} c : a {"));
+
+        // Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use
+        // of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents.
+        getNifiClient().getProcessorClient().startProcessor(verify);
+
+        waitForQueueCount(verifyToTerminateAa.getId(), 1);
+        waitForQueueCount(verifyToTerminateBa.getId(), 1);
+        waitForQueueCount(verifyToTerminateCa.getId(), 1);
+        waitForQueueCount(verifyToTerminateUnmatched.getId(), 0);
+    }
+}