You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/03/16 22:40:39 UTC

[nifi] branch main updated: NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ad88bb  NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors
2ad88bb is described below

commit 2ad88bbffff3c90bca7404d10ebc40953c1b3c63
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Mar 15 14:04:30 2021 -0500

    NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors
    
    - Refactored TestPutTCP to single class
    - Improved TestListenRELP
    - Improved TestListenTCP
    - Improved TestListenUDP
    - Improved TestListenTCPRecord
    - Changed OnUnscheduled to OnStopped in AbstractListenEventProcessor
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../util/listen/AbstractListenEventProcessor.java  |   6 +-
 .../nifi/processors/standard/ListenTCPRecord.java  |  11 +-
 .../nifi/processors/standard/TestListenRELP.java   | 201 ++++++------
 .../nifi/processors/standard/TestListenTCP.java    | 114 +++----
 .../processors/standard/TestListenTCPRecord.java   | 164 +++-------
 .../nifi/processors/standard/TestListenUDP.java    | 108 ++-----
 .../nifi/processors/standard/TestPutTCP.java       | 293 +++++++++++++++++-
 .../nifi/processors/standard/TestPutTcpSSL.java    |  82 -----
 .../processors/standard/util/TestPutTCPCommon.java | 339 ---------------------
 9 files changed, 483 insertions(+), 835 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index edba61a..7898319 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processor.util.listen;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -220,8 +220,8 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
         return events == null ? 0 : events.size();
     }
 
-    @OnUnscheduled
-    public void onUnscheduled() {
+    @OnStopped
+    public void closeDispatcher() {
         if (dispatcher != null) {
             dispatcher.close();
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
index 0187961..acc936a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
@@ -46,7 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -299,8 +299,8 @@ public class ListenTCPRecord extends AbstractProcessor {
         readerThread.start();
     }
 
-    @OnUnscheduled
-    public void onUnscheduled() {
+    @OnStopped
+    public void onStopped() {
         if (dispatcher != null) {
             dispatcher.close();
             dispatcher = null;
@@ -460,9 +460,4 @@ public class ListenTCPRecord extends AbstractProcessor {
     private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
         return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
     }
-
-    public final int getDispatcherPort() {
-        return dispatcher == null ? 0 : dispatcher.getPort();
-    }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index e48fe76..7e95433 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
@@ -24,21 +25,21 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import javax.net.ssl.SSLContext;
-import org.apache.commons.io.IOUtils;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import org.apache.nifi.processor.util.listen.response.ChannelResponder;
 import org.apache.nifi.processors.standard.relp.event.RELPEvent;
 import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
 import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.apache.nifi.processors.standard.relp.response.RELPResponse;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -47,8 +48,12 @@ import org.apache.nifi.web.util.ssl.SslContextUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestListenRELP {
 
     public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
@@ -75,82 +80,83 @@ public class TestListenRELP {
             .data(new byte[0])
             .build();
 
+    private static final String LOCALHOST = "localhost";
+
+    @Mock
+    private ChannelResponder<SocketChannel> responder;
+
+    @Mock
+    private ChannelDispatcher channelDispatcher;
+
+    @Mock
+    private RestrictedSSLContextService sslContextService;
+
     private RELPEncoder encoder;
-    private ResponseCapturingListenRELP proc;
+
     private TestRunner runner;
 
     @Before
     public void setup() {
         encoder = new RELPEncoder(StandardCharsets.UTF_8);
-        proc = new ResponseCapturingListenRELP();
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenRELP.PORT, "0");
+        runner = TestRunners.newTestRunner(ListenRELP.class);
     }
 
     @Test
-    public void testListenRELP() throws IOException, InterruptedException {
-        final List<RELPFrame> frames = new ArrayList<>();
-        frames.add(OPEN_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(CLOSE_FRAME);
+    public void testRun() throws IOException {
+        final int syslogFrames = 5;
+        final List<RELPFrame> frames = getFrames(syslogFrames);
 
         // three syslog frames should be transferred and three responses should be sent
-        run(frames, 3, 3, null);
+        run(frames, syslogFrames, syslogFrames, null);
 
         final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
         Assert.assertNotNull(events);
-        Assert.assertEquals(3, events.size());
+        Assert.assertEquals(syslogFrames, events.size());
 
         final ProvenanceEventRecord event = events.get(0);
         Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
         Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
-        Assert.assertEquals(3, mockFlowFiles.size());
+        Assert.assertEquals(syslogFrames, mockFlowFiles.size());
 
         final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
         Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
         Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
-        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
-        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+        Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+        Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
     }
 
     @Test
-    public void testBatching() throws IOException, InterruptedException {
+    public void testRunBatching() throws IOException {
         runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
 
-        final List<RELPFrame> frames = new ArrayList<>();
-        frames.add(OPEN_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(CLOSE_FRAME);
+        final int syslogFrames = 3;
+        final List<RELPFrame> frames = getFrames(syslogFrames);
 
         // one syslog frame should be transferred since we are batching, but three responses should be sent
-        run(frames, 1, 3, null);
+        final int expectedFlowFiles = 1;
+        run(frames, expectedFlowFiles, syslogFrames, null);
 
         final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
         Assert.assertNotNull(events);
-        Assert.assertEquals(1, events.size());
+        Assert.assertEquals(expectedFlowFiles, events.size());
 
         final ProvenanceEventRecord event = events.get(0);
         Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
         Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
-        Assert.assertEquals(1, mockFlowFiles.size());
+        Assert.assertEquals(expectedFlowFiles, mockFlowFiles.size());
 
         final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
         Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
-        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
-        Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+        Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+        Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
     }
 
     @Test
-    public void testMutualTls() throws IOException, InterruptedException, TlsException, InitializationException {
-        final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
+    public void testRunMutualTls() throws IOException, TlsException, InitializationException {
         final String serviceIdentifier = SSLContextService.class.getName();
         Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
@@ -160,33 +166,26 @@ public class TestListenRELP {
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
 
-        final List<RELPFrame> frames = new ArrayList<>();
-        frames.add(OPEN_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(SYSLOG_FRAME);
-        frames.add(CLOSE_FRAME);
-
-        run(frames, 5, 5, sslContext);
+        final int syslogFrames = 3;
+        final List<RELPFrame> frames = getFrames(syslogFrames);
+        run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
     @Test
-    public void testNoEventsAvailable() throws IOException, InterruptedException {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<RELPEvent>());
+    public void testRunNoEventsAvailable() {
+        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, "1");
+        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
 
         runner.run();
         runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
+        runner.shutdown();
     }
 
     @Test
-    public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
+    public void testBatchingWithDifferentSenders() {
         final String sender1 = "sender1";
         final String sender2 = "sender2";
-        final ChannelResponder<SocketChannel> responder = Mockito.mock(ChannelResponder.class);
 
         final List<RELPEvent> mockEvents = new ArrayList<>();
         mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
@@ -196,96 +195,68 @@ public class TestListenRELP {
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, "1");
+        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
         runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
 
         runner.run();
         runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.shutdown();
     }
 
+    private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
+            throws IOException {
 
-    protected void run(final List<RELPFrame> frames, final int expectedTransferred, final int expectedResponses, final SSLContext sslContext)
-            throws IOException, InterruptedException {
-
-        Socket socket = null;
-        try {
-            // schedule to start listening on a random port
-            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-            final ProcessContext context = runner.getProcessContext();
-            proc.onScheduled(context);
-
-            // create a client connection to the port the dispatcher is listening on
-            final int realPort = proc.getDispatcherPort();
-
-            // create either a regular socket or ssl socket based on context being passed in
-            if (sslContext == null) {
-                socket = new Socket("localhost", realPort);
-            } else {
-                socket = sslContext.getSocketFactory().createSocket("localhost", realPort);
-            }
-            Thread.sleep(100);
-
-            // send the frames to the port the processors is listening on
-            sendFrames(frames, socket);
-
-            long responseTimeout = 30000;
-
-            // this first loop waits until the internal queue of the processor has the expected
-            // number of messages ready before proceeding, we want to guarantee they are all there
-            // before onTrigger gets a chance to run
-            long startTimeQueueSizeCheck = System.currentTimeMillis();
-            while (proc.getQueueSize() < expectedResponses
-                    && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
-                Thread.sleep(100);
-            }
-
-            // want to fail here if the queue size isn't what we expect
-            Assert.assertEquals(expectedResponses, proc.getQueueSize());
+        final int port = NetworkUtils.availablePort();
+        runner.setProperty(ListenRELP.PORT, Integer.toString(port));
 
-            // call onTrigger until we got a respond for all the frames, or a certain amount of time passes
-            long startTimeProcessing = System.currentTimeMillis();
-            while (proc.responses.size() < expectedResponses
-                    && (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) {
-                proc.onTrigger(context, processSessionFactory);
-                Thread.sleep(100);
-            }
+        // Run Processor and start Dispatcher without shutting down
+        runner.run(1, false, true);
 
-            // should have gotten a response for each frame
-            Assert.assertEquals(expectedResponses, proc.responses.size());
+        try (final Socket socket = getSocket(port, sslContext)) {
+            final OutputStream outputStream = socket.getOutputStream();
+            sendFrames(frames, outputStream);
 
-            // should have transferred the expected events
-            runner.assertTransferCount(ListenRELP.REL_SUCCESS, expectedTransferred);
+            // Run Processor for number of responses
+            runner.run(responses, false, false);
 
+            runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
         } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-            IOUtils.closeQuietly(socket);
+            runner.shutdown();
         }
     }
 
-    private void sendFrames(final List<RELPFrame> frames, final Socket socket) throws IOException, InterruptedException {
-        // send the provided messages
+    private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
         for (final RELPFrame frame : frames) {
-            byte[] encodedFrame = encoder.encode(frame);
-            socket.getOutputStream().write(encodedFrame);
+            final byte[] encodedFrame = encoder.encode(frame);
+            outputStream.write(encodedFrame);
+            outputStream.flush();
         }
-        socket.getOutputStream().flush();
     }
 
-    // Extend ListenRELP so we can use the CapturingSocketChannelResponseDispatcher
-    private static class ResponseCapturingListenRELP extends ListenRELP {
+    private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+        final Socket socket;
+        if (sslContext == null) {
+            socket = new Socket(LOCALHOST, port);
+        } else {
+            socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
+        }
+        return socket;
+    }
 
-        private final List<RELPResponse> responses = new ArrayList<>();
+    private List<RELPFrame> getFrames(final int syslogFrames) {
+        final List<RELPFrame> frames = new ArrayList<>();
+        frames.add(OPEN_FRAME);
 
-        @Override
-        protected void respond(RELPEvent event, RELPResponse relpResponse) {
-            this.responses.add(relpResponse);
-            super.respond(event, relpResponse);
+        for (int i = 0; i < syslogFrames; i++) {
+            frames.add(SYSLOG_FRAME);
         }
+
+        frames.add(CLOSE_FRAME);
+        return frames;
     }
 
     // Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
-    private static class MockListenRELP extends ListenRELP {
+    private class MockListenRELP extends ListenRELP {
 
         private final List<RELPEvent> mockEvents;
 
@@ -301,8 +272,8 @@ public class TestListenRELP {
         }
 
         @Override
-        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) throws IOException {
-            return Mockito.mock(ChannelDispatcher.class);
+        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) {
+            return channelDispatcher;
         }
 
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index 4091516..1745002 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -16,9 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.security.util.TlsException;
@@ -34,24 +32,23 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
 public class TestListenTCP {
-    private static final long RESPONSE_TIMEOUT = 10000;
-
     private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
 
+    private static final String LOCALHOST = "localhost";
+
     private static SSLContext keyStoreSslContext;
 
     private static SSLContext trustStoreSslContext;
 
-    private ListenTCP proc;
     private TestRunner runner;
 
     @BeforeClass
@@ -62,9 +59,7 @@ public class TestListenTCP {
 
     @Before
     public void setup() {
-        proc = new ListenTCP();
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenTCP.PORT, "0");
+        runner = TestRunners.newTestRunner(ListenTCP.class);
     }
 
     @Test
@@ -81,7 +76,7 @@ public class TestListenTCP {
     }
 
     @Test
-    public void testListenTCP() throws IOException, InterruptedException {
+    public void testListenTCP() throws IOException {
         final List<String> messages = new ArrayList<>();
         messages.add("This is message 1\n");
         messages.add("This is message 2\n");
@@ -89,7 +84,7 @@ public class TestListenTCP {
         messages.add("This is message 4\n");
         messages.add("This is message 5\n");
 
-        runTCP(messages, messages.size(), null);
+        run(messages, messages.size(), null);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -98,7 +93,7 @@ public class TestListenTCP {
     }
 
     @Test
-    public void testListenTCPBatching() throws IOException, InterruptedException {
+    public void testListenTCPBatching() throws IOException {
         runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
 
         final List<String> messages = new ArrayList<>();
@@ -108,7 +103,7 @@ public class TestListenTCP {
         messages.add("This is message 4\n");
         messages.add("This is message 5\n");
 
-        runTCP(messages, 2, null);
+        run(messages, 2, null);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
 
@@ -120,9 +115,7 @@ public class TestListenTCP {
     }
 
     @Test
-    public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InterruptedException,
-            InitializationException {
-
+    public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InitializationException {
         runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
         enableSslContextService(keyStoreSslContext);
 
@@ -133,8 +126,7 @@ public class TestListenTCP {
         messages.add("This is message 4\n");
         messages.add("This is message 5\n");
 
-        // Make an SSLContext with a key and trust store to send the test messages
-        runTCP(messages, messages.size(), keyStoreSslContext);
+        run(messages, messages.size(), keyStoreSslContext);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -154,14 +146,13 @@ public class TestListenTCP {
         messages.add("This is message 4\n");
         messages.add("This is message 5\n");
 
-        // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
         Assert.assertThrows(IOException.class, () ->
-            runTCP(messages, messages.size(), trustStoreSslContext)
+            run(messages, messages.size(), trustStoreSslContext)
         );
     }
 
     @Test
-    public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InterruptedException, InitializationException {
+    public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InitializationException {
         runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name());
         enableSslContextService(keyStoreSslContext);
 
@@ -172,8 +163,7 @@ public class TestListenTCP {
         messages.add("This is message 4\n");
         messages.add("This is message 5\n");
 
-        // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
-        runTCP(messages, messages.size(), trustStoreSslContext);
+        run(messages, messages.size(), trustStoreSslContext);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -181,63 +171,39 @@ public class TestListenTCP {
         }
     }
 
-    protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
-            throws IOException, InterruptedException {
-
-        Socket socket = null;
-        try {
-            // schedule to start listening on a random port
-            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-            final ProcessContext context = runner.getProcessContext();
-            proc.onScheduled(context);
-
-            // create a client connection to the port the dispatcher is listening on
-            final int realPort = proc.getDispatcherPort();
-
-            // create either a regular socket or ssl socket based on context being passed in
-            if (sslContext == null) {
-                socket = new Socket("localhost", realPort);
-            } else {
-                final SocketFactory socketFactory = sslContext.getSocketFactory();
-                socket = socketFactory.createSocket("localhost", realPort);
-            }
-            Thread.sleep(100);
+    protected void run(final List<String> messages, final int flowFiles, final SSLContext sslContext)
+            throws IOException {
 
-            // send the frames to the port the processors is listening on
-            for (final String message : messages) {
-                socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
-                Thread.sleep(1);
-            }
-            socket.getOutputStream().flush();
-
-            // this first loop waits until the internal queue of the processor has the expected
-            // number of messages ready before proceeding, we want to guarantee they are all there
-            // before onTrigger gets a chance to run
-            long startTimeQueueSizeCheck = System.currentTimeMillis();
-            while (proc.getQueueSize() < messages.size()
-                    && (System.currentTimeMillis() - startTimeQueueSizeCheck < RESPONSE_TIMEOUT)) {
-                Thread.sleep(100);
-            }
+        final int port = NetworkUtils.availablePort();
+        runner.setProperty(ListenTCP.PORT, Integer.toString(port));
 
-            // want to fail here if the queue size isn't what we expect
-            Assert.assertEquals(messages.size(), proc.getQueueSize());
+        // Run Processor and start Dispatcher without shutting down
+        runner.run(1, false, true);
 
-            // call onTrigger until we processed all the frames, or a certain amount of time passes
-            int numTransferred = 0;
-            long startTime = System.currentTimeMillis();
-            while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < RESPONSE_TIMEOUT)) {
-                proc.onTrigger(context, processSessionFactory);
-                numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size();
-                Thread.sleep(100);
+        try (final Socket socket = getSocket(port, sslContext)) {
+            final OutputStream outputStream = socket.getOutputStream();
+            for (final String message : messages) {
+                outputStream.write(message.getBytes(StandardCharsets.UTF_8));
             }
+            outputStream.flush();
+
+            // Run Processor for number of responses
+            runner.run(flowFiles, false, false);
 
-            // should have transferred the expected events
-            runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred);
+            runner.assertTransferCount(ListenTCP.REL_SUCCESS, flowFiles);
         } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-            IOUtils.closeQuietly(socket);
+            runner.shutdown();
+        }
+    }
+
+    private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+        final Socket socket;
+        if (sslContext == null) {
+            socket = new Socket(LOCALHOST, port);
+        } else {
+            socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
         }
+        return socket;
     }
 
     private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index e0d1a15..e2e5c74 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -16,18 +16,17 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
-import org.apache.commons.io.IOUtils;
+
 import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.security.util.ClientAuth;
@@ -46,12 +45,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TestListenTCPRecord {
-    static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
-
     static final String SCHEMA_TEXT = "{\n" +
             "  \"name\": \"syslogRecord\",\n" +
             "  \"namespace\": \"nifi\",\n" +
@@ -63,17 +58,13 @@ public class TestListenTCPRecord {
             "  ]\n" +
             "}";
 
-    static final List<String> DATA;
+    static final String DATA = "[" +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
+            "]";
 
-    static {
-        final List<String> data = new ArrayList<>();
-        data.add("[");
-        data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},");
-        data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},");
-        data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}");
-        data.add("]");
-        DATA = Collections.unmodifiableList(data);
-    }
+    private static final String LOCALHOST = "localhost";
 
     private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
 
@@ -81,7 +72,6 @@ public class TestListenTCPRecord {
 
     private static SSLContext trustStoreSslContext;
 
-    private ListenTCPRecord proc;
     private TestRunner runner;
 
     @BeforeClass
@@ -92,9 +82,7 @@ public class TestListenTCPRecord {
 
     @Before
     public void setup() throws InitializationException {
-        proc = new ListenTCPRecord();
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenTCPRecord.PORT, "0");
+        runner = TestRunners.newTestRunner(ListenTCPRecord.class);
 
         final String readerId = "record-reader";
         final RecordReaderFactory readerFactory = new JsonTreeReader();
@@ -110,7 +98,6 @@ public class TestListenTCPRecord {
 
         runner.setProperty(ListenTCPRecord.RECORD_READER, readerId);
         runner.setProperty(ListenTCPRecord.RECORD_WRITER, writerId);
-
     }
 
     @Test
@@ -130,7 +117,7 @@ public class TestListenTCPRecord {
     public void testOneRecordPerFlowFile() throws IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
 
-        runTCP(DATA, 3, null);
+        run(3, null);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
         for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -147,7 +134,7 @@ public class TestListenTCPRecord {
     public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
 
-        runTCP(DATA, 1, null);
+        run(1, null);
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
         Assert.assertEquals(1, mockFlowFiles.size());
@@ -167,7 +154,7 @@ public class TestListenTCPRecord {
         runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
         enableSslContextService(keyStoreSslContext);
 
-        runTCP(DATA, 1, keyStoreSslContext);
+        run(1, keyStoreSslContext);
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
         Assert.assertEquals(1, mockFlowFiles.size());
@@ -180,21 +167,11 @@ public class TestListenTCPRecord {
     }
 
     @Test
-    public void testTLSClientAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
-        runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
-        runner.setProperty(ListenTCPRecord.READ_TIMEOUT, "5 seconds");
-        enableSslContextService(keyStoreSslContext);
-
-        runTCP(DATA, 0, trustStoreSslContext);
-    }
-
-    @Test
     public void testTLSClientAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
-
         runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
         enableSslContextService(keyStoreSslContext);
 
-        runTCP(DATA, 1, trustStoreSslContext);
+        run(1, trustStoreSslContext);
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
         Assert.assertEquals(1, mockFlowFiles.size());
@@ -206,88 +183,43 @@ public class TestListenTCPRecord {
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
-            throws IOException, InterruptedException {
-
-        SocketSender sender = null;
-        try {
-            // schedule to start listening on a random port
-            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-            final ProcessContext context = runner.getProcessContext();
-            proc.onScheduled(context);
-            Thread.sleep(100);
-
-            sender = new SocketSender(proc.getDispatcherPort(), "localhost", sslContext, messages, 0);
-
-            final Thread senderThread = new Thread(sender);
-            senderThread.setDaemon(true);
-            senderThread.start();
-
-            long timeout = 10000;
-
-            // call onTrigger until we processed all the records, or a certain amount of time passes
-            int numTransferred = 0;
-            long startTime = System.currentTimeMillis();
-            while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < timeout)) {
-                proc.onTrigger(context, processSessionFactory);
-                numTransferred = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
-                Thread.sleep(100);
+    protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
+        final int port = NetworkUtils.availablePort();
+        runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
+
+        // Run Processor and start listener without shutting down
+        runner.run(1, false, true);
+
+        final AtomicBoolean completed = new AtomicBoolean(false);
+        final Thread thread = new Thread(() -> {
+            try (final Socket socket = getSocket(port, sslContext)) {
+                final OutputStream outputStream = socket.getOutputStream();
+                outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
+                outputStream.flush();
+                completed.set(true);
+            } catch (final IOException e) {
+                throw new UncheckedIOException(e);
             }
+        });
+        thread.start();
 
-            // should have transferred the expected events
-            runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-            IOUtils.closeQuietly(sender);
-        }
-    }
-
-    private static class SocketSender implements Runnable, Closeable {
-
-        private final int port;
-        private final String host;
-        private final SSLContext sslContext;
-        private final List<String> data;
-        private final long delay;
+        // Wait for Send Completion
+        completed.compareAndSet(true, false);
 
-        private Socket socket;
-
-        public SocketSender(final int port, final String host, final SSLContext sslContext, final List<String> data, final long delay) {
-            this.port = port;
-            this.host = host;
-            this.sslContext = sslContext;
-            this.data = data;
-            this.delay = delay;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (sslContext != null) {
-                    socket = sslContext.getSocketFactory().createSocket(host, port);
-                } else {
-                    socket = new Socket(host, port);
-                }
-
-                for (final String message : data) {
-                    socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
-                    if (delay > 0) {
-                        Thread.sleep(delay);
-                    }
-                }
-
-                socket.getOutputStream().flush();
-            } catch (final Exception e) {
-                LOGGER.error(e.getMessage(), e);
-            } finally {
-                IOUtils.closeQuietly(socket);
-            }
-        }
+        // Run Processor for expected FlowFiles with an additional run to ensure completion
+        final int iterations = expectedTransferred + 1;
+        runner.run(iterations, true, false);
+        runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
+    }
 
-        public void close() {
-            IOUtils.closeQuietly(socket);
+    private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+        final Socket socket;
+        if (sslContext == null) {
+            socket = new Socket(LOCALHOST, port);
+        } else {
+            socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
         }
+        return socket;
     }
 
     private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index b6c997e..bbbce1f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -16,23 +16,20 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.StandardEvent;
 import org.apache.nifi.processor.util.listen.response.ChannelResponder;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -47,34 +44,21 @@ import java.util.concurrent.BlockingQueue;
 
 public class TestListenUDP {
 
-    private int port = 0;
-    private ListenUDP proc;
-    private TestRunner runner;
+    private static final String LOCALHOST = "localhost";
 
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug");
-    }
+    private int port = 0;
 
-    @AfterClass
-    public static void tearDownAfterClass() {
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "false");
-    }
+    private TestRunner runner;
 
     @Before
     public void setUp() throws Exception {
-        proc = new ListenUDP();
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenUDP.PORT, String.valueOf(port));
+        runner = TestRunners.newTestRunner(ListenUDP.class);
+        port = NetworkUtils.availablePort();
+        runner.setProperty(ListenUDP.PORT, Integer.toString(port));
     }
 
     @Test
     public void testCustomValidation() {
-        runner.assertNotValid();
         runner.setProperty(ListenUDP.PORT, "1");
         runner.assertValid();
 
@@ -110,15 +94,13 @@ public class TestListenUDP {
         runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize));
 
         final List<String> messages = getMessages(20);
-        final int expectedQueued = maxQueueSize;
-        final int expectedTransferred = maxQueueSize;
 
-        run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+        run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
         verifyFlowFiles(mockFlowFiles);
-        verifyProvenance(expectedTransferred);
+        verifyProvenance(maxQueueSize);
     }
 
     @Test
@@ -146,7 +128,7 @@ public class TestListenUDP {
     }
 
     @Test
-    public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
+    public void testBatchingWithDifferentSenders() {
         final String sender1 = "sender1";
         final String sender2 = "sender2";
         final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
@@ -164,7 +146,6 @@ public class TestListenUDP {
         runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
 
         // sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
-
         runner.run();
         runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
 
@@ -172,7 +153,7 @@ public class TestListenUDP {
     }
 
     @Test
-    public void testRunWhenNoEventsAvailable() throws IOException, InterruptedException {
+    public void testRunWhenNoEventsAvailable() {
         final List<StandardEvent> mockEvents = new ArrayList<>();
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
@@ -206,26 +187,6 @@ public class TestListenUDP {
         verifyProvenance(expectedTransferred);
     }
 
-    @Test
-    public void testWithSendingHostAndPortDifferentThanSender() throws IOException, InterruptedException {
-        final String sendingHost = "localhost";
-        final Integer sendingPort = 21001;
-        runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
-        runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
-
-        // bind to a different sending port than the processor has for Sending Host Port
-        final DatagramSocket socket = new DatagramSocket(21002);
-
-        // no messages should come through since we are listening for 21001 and sending from 21002
-
-        final List<String> messages = getMessages(6);
-        final int expectedQueued = 0;
-        final int expectedTransferred = 0;
-
-        run(socket, messages, expectedQueued, expectedTransferred);
-        runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
-    }
-
     private List<String> getMessages(int numMessages) {
         final List<String> messages = new ArrayList<>();
         for (int i=0; i < numMessages; i++) {
@@ -256,54 +217,25 @@ public class TestListenUDP {
     protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
             throws IOException, InterruptedException {
 
-        try {
-            // schedule to start listening on a random port
-            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-            final ProcessContext context = runner.getProcessContext();
-            proc.onScheduled(context);
-            Thread.sleep(100);
 
-            // get the real port the dispatcher is listening on
-            final int destPort = proc.getDispatcherPort();
-            final InetSocketAddress destination = new InetSocketAddress("localhost", destPort);
 
-            // send the messages to the port the processors is listening on
+        // Run Processor and start Dispatcher without shutting down
+        runner.run(1, false, true);
+
+        try {
+            final InetSocketAddress destination = new InetSocketAddress(LOCALHOST, port);
             for (final String message : messages) {
                 final byte[] buffer = message.getBytes(StandardCharsets.UTF_8);
                 final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination);
                 socket.send(packet);
-                Thread.sleep(10);
             }
 
-            long responseTimeout = 10000;
-
-            // this first loop waits until the internal queue of the processor has the expected
-            // number of messages ready before proceeding, we want to guarantee they are all there
-            // before onTrigger gets a chance to run
-            long startTimeQueueSizeCheck = System.currentTimeMillis();
-            while (proc.getQueueSize() < expectedQueueSize
-                    && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
-                Thread.sleep(100);
-            }
-
-            // want to fail here if the queue size isn't what we expect
-            Assert.assertEquals(expectedQueueSize, proc.getQueueSize());
-
-            // call onTrigger until we processed all the messages, or a certain amount of time passes
-            int numTransferred = 0;
-            long startTime = System.currentTimeMillis();
-            while (numTransferred < expectedTransferred  && (System.currentTimeMillis() - startTime < responseTimeout)) {
-                proc.onTrigger(context, processSessionFactory);
-                numTransferred = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS).size();
-                Thread.sleep(100);
-            }
+            // Run Processor for number of responses
+            runner.run(expectedTransferred, false, false);
 
-            // should have transferred the expected events
             runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred);
         } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-            IOUtils.closeQuietly(socket);
+            runner.shutdown();
         }
     }
 
@@ -324,7 +256,7 @@ public class TestListenUDP {
         }
 
         @Override
-        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
+        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
             return Mockito.mock(ChannelDispatcher.class);
         }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index e520711..d6c806f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -14,18 +14,246 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.processors.standard;
 
-import org.apache.nifi.processors.standard.util.TestPutTCPCommon;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.nifi.processors.standard.util.TCPTestServer;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestPutTCP {
+    private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
+    private final static String SERVER_VARIABLE = "server.address";
+    private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
+    private final static int MIN_INVALID_PORT = 0;
+    private final static int MIN_VALID_PORT = 1;
+    private final static int MAX_VALID_PORT = 65535;
+    private final static int MAX_INVALID_PORT = 65536;
+    private final static int BUFFER_SIZE = 1024;
+    private final static int VALID_LARGE_FILE_SIZE = 32768;
+    private final static int VALID_SMALL_FILE_SIZE = 64;
+    private final static int LOAD_TEST_ITERATIONS = 500;
+    private final static int LOAD_TEST_THREAD_COUNT = 1;
+    private final static int DEFAULT_ITERATIONS = 1;
+    private final static int DEFAULT_THREAD_COUNT = 1;
+    private final static char CONTENT_CHAR = 'x';
+    private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
+    private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
+    private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
+    private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
+    private final static String[] EMPTY_FILE = { "" };
+    private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
+
+    private TCPTestServer server;
+    private int port;
+    private ArrayBlockingQueue<List<Byte>> received;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws Exception {
+        received = new ArrayBlockingQueue<>(BUFFER_SIZE);
+        runner = TestRunners.newTestRunner(PutTCP.class);
+        runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
+    }
+
+    @After
+    public void cleanup() {
+        runner.shutdown();
+        removeTestServer(server);
+    }
+
+    @Test
+    public void testPortProperty() {
+        runner.setProperty(PutTCP.PORT, Integer.toString(MIN_INVALID_PORT));
+        runner.assertNotValid();
+
+        runner.setProperty(PutTCP.PORT, Integer.toString(MIN_VALID_PORT));
+        runner.assertValid();
+
+        runner.setProperty(PutTCP.PORT, Integer.toString(MAX_VALID_PORT));
+        runner.assertValid();
+
+        runner.setProperty(PutTCP.PORT, Integer.toString(MAX_INVALID_PORT));
+        runner.assertNotValid();
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccess() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+    }
 
-public class TestPutTCP extends TestPutTCPCommon {
+    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessSslContextService() throws Exception {
+        final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
+
+        try {
+            final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
+            assertNotNull("SSLContext not found", sslContext);
+
+            final String identifier = SSLContextService.class.getName();
+            final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
+            Mockito.when(sslContextService.getIdentifier()).thenReturn(identifier);
+            Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+            runner.addControllerService(identifier, sslContextService);
+            runner.enableControllerService(sslContextService);
+            runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
+
+            final SSLServerSocketFactory serverSocketFactory = sslContext.getServerSocketFactory();
+            createTestServer(OUTGOING_MESSAGE_DELIMITER, false, serverSocketFactory);
+            configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+            sendTestData(VALID_FILES);
+            assertMessagesReceived(VALID_FILES);
+            assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+        } finally {
+            Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
+            Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
+        }
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessServerVariableExpression() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessPruneSenders() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(VALID_FILES);
+        assertTransfers(VALID_FILES.length);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+        runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
+        Thread.sleep(1000);
+        runner.run(1, false, false);
+        runner.clearTransferState();
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections after prune senders not matched", server.getTotalNumConnections(), 2);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessMultiCharDelimiter() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+    }
+
+    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessConnectionPerFlowFile() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), VALID_FILES.length);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessConnectionFailure() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+        removeTestServer(server);
+        runner.clearTransferState();
+        sendTestData(VALID_FILES);
+        Thread.sleep(500);
+        assertNull("Unexpected Data Received", received.poll());
+        runner.assertQueueEmpty();
+        assertEquals("Server Connections after restart not matched", server.getTotalNumConnections(), 1);
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+    }
 
-    public TestPutTCP() {
-        super();
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessEmptyFile() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(EMPTY_FILE);
+        assertTransfers(EMPTY_FILE.length);
+        runner.assertQueueEmpty();
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
     }
 
-    @Override
-    public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) {
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessLargeValidFile() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
+        final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
+        sendTestData(testData);
+        assertMessagesReceived(testData);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), testData.length);
+    }
+
+    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    public void testRunSuccessFiveHundredMessages() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        Thread.sleep(1000);
+        final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
+        assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
+        assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+    }
+
+    private void createTestServer(final String delimiter) throws Exception {
+        createTestServer(delimiter, false);
+    }
+
+    private void createTestServer(final String delimiter, final boolean closeOnMessageReceived) throws Exception {
+        createTestServer(delimiter, closeOnMessageReceived, ServerSocketFactory.getDefault());
+    }
+
+    private void createTestServer(final String delimiter, final boolean closeOnMessageReceived, final ServerSocketFactory serverSocketFactory) throws Exception {
+        server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), received, delimiter, closeOnMessageReceived);
+        server.startServer(serverSocketFactory);
+        port = server.getPort();
+    }
+
+    private void removeTestServer(final TCPTestServer server) {
+        if (server != null) {
+            server.shutdown();
+        }
+    }
+
+    private void configureProperties(String host, String outgoingMessageDelimiter, boolean connectionPerFlowFile) {
         runner.setProperty(PutTCP.HOSTNAME, host);
         runner.setProperty(PutTCP.PORT, Integer.toString(port));
         if (outgoingMessageDelimiter != null) {
@@ -33,11 +261,56 @@ public class TestPutTCP extends TestPutTCPCommon {
         }
 
         runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
+        runner.assertValid();
+    }
+
+    private void sendTestData(final String[] testData) {
+        sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
+    }
+
+    private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
+        runner.setThreadCount(threadCount);
+        for (int i = 0; i < iterations; i++) {
+            for (String item : testData) {
+                runner.enqueue(item.getBytes());
+            }
+            runner.run(testData.length, false, i == 0);
+        }
+    }
+
+    private void assertTransfers(final int successCount) {
+        runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
+        runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
+    }
+
+    private void assertMessagesReceived(final String[] sentData) throws Exception {
+        assertMessagesReceived(sentData, DEFAULT_ITERATIONS);
+        runner.assertQueueEmpty();
+    }
 
-        if (expectValid) {
-            runner.assertValid();
-        } else {
-            runner.assertNotValid();
+    private void assertMessagesReceived(final String[] sentData, final int iterations) throws Exception {
+        for (int i = 0; i < iterations; i++) {
+            for (String item : sentData) {
+                List<Byte> message = received.take();
+                assertNotNull(String.format("Message [%d] not found", i), message);
+                Byte[] messageBytes = new Byte[message.size()];
+                assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
+            }
         }
+
+        runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
+        runner.clearTransferState();
+
+        assertNull("Unexpected Message Found", received.poll());
+    }
+
+    private String[] createContent(final int size) {
+        final char[] content = new char[size];
+
+        for (int i = 0; i < size; i++) {
+            content[i] = CONTENT_CHAR;
+        }
+
+        return new String[] { new String(content) };
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java
deleted file mode 100644
index 6870ae9..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.processors.standard.util.TestPutTCPCommon;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.security.util.KeystoreType;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.security.util.StandardTlsConfiguration;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.apache.nifi.security.util.TlsException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import javax.net.ssl.SSLContext;
-
-public class TestPutTcpSSL extends TestPutTCPCommon {
-    private static final String TLS_PROTOCOL = "TLSv1.2";
-
-    private static SSLContext sslContext;
-
-    @BeforeClass
-    public static void configureServices() throws TlsException {
-        final TlsConfiguration configuration = new StandardTlsConfiguration(
-                "src/test/resources/keystore.jks",
-                "passwordpassword",
-                "passwordpassword",
-                KeystoreType.JKS,
-                "src/test/resources/truststore.jks",
-                "passwordpassword",
-                KeystoreType.JKS,
-                TLS_PROTOCOL
-        );
-        sslContext = SslContextFactory.createSslContext(configuration);
-    }
-
-    public TestPutTcpSSL() {
-        super();
-        serverSocketFactory = sslContext.getServerSocketFactory();
-    }
-
-    @Override
-    public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) throws InitializationException {
-        runner.setProperty(PutTCP.HOSTNAME, host);
-        runner.setProperty(PutTCP.PORT, Integer.toString(port));
-
-        final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
-        final String serviceIdentifier = SSLContextService.class.getName();
-        Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
-
-        runner.addControllerService(serviceIdentifier, sslContextService);
-        runner.enableControllerService(sslContextService);
-        runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, serviceIdentifier);
-
-        if (outgoingMessageDelimiter != null) {
-            runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
-        }
-        runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
-
-        if (expectValid) {
-            runner.assertValid();
-        } else {
-            runner.assertNotValid();
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
deleted file mode 100644
index 3f94087..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.nifi.processors.standard.PutTCP;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import javax.net.ServerSocketFactory;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public abstract class TestPutTCPCommon {
-    private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
-    private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
-    private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
-    private final static String UNKNOWN_HOST = "fgdsfgsdffd";
-    private final static String INVALID_IP_ADDRESS = "300.300.300.300";
-    private final static int MIN_INVALID_PORT = 0;
-    private final static int MIN_VALID_PORT = 1;
-    private final static int MAX_VALID_PORT = 65535;
-    private final static int MAX_INVALID_PORT = 65536;
-    private final static int BUFFER_SIZE = 1024;
-    private final static int VALID_LARGE_FILE_SIZE = 32768;
-    private final static int VALID_SMALL_FILE_SIZE = 64;
-    private final static int LOAD_TEST_ITERATIONS = 500;
-    private final static int LOAD_TEST_THREAD_COUNT = 1;
-    private final static int DEFAULT_ITERATIONS = 1;
-    private final static int DEFAULT_THREAD_COUNT = 1;
-    private final static char CONTENT_CHAR = 'x';
-    private final static int DATA_WAIT_PERIOD = 1000;
-    private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
-    private final static int LONG_TEST_TIMEOUT_PERIOD = 180000;
-    private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
-    private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
-
-    private TCPTestServer server;
-    private int port;
-    private ArrayBlockingQueue<List<Byte>> recvQueue;
-
-    public ServerSocketFactory serverSocketFactory;
-    public TestRunner runner;
-
-    // Test Data
-    private final static String[] EMPTY_FILE = { "" };
-    private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
-
-    @BeforeClass
-    public static void setUpSuite() {
-        Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
-    }
-
-    @Before
-    public void setup() throws Exception {
-        recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
-        runner = TestRunners.newTestRunner(PutTCP.class);
-        runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
-    }
-
-    private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter, final boolean closeOnMessageReceived) throws Exception {
-        TCPTestServer server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), queue, delimiter, closeOnMessageReceived);
-        server.startServer(serverSocketFactory);
-        port = server.getPort();
-        return server;
-    }
-
-    private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter) throws Exception {
-        return createTestServer(queue, delimiter, false);
-    }
-
-    @After
-    public void cleanup() {
-        runner.shutdown();
-        removeTestServer(server);
-    }
-
-    private void removeTestServer(TCPTestServer server) {
-        if (server != null) {
-            server.shutdown();
-        }
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testValidFiles() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testValidFilesEL() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS_EL, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testPruneSenders() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        Thread.sleep(10);
-        checkRelationships(VALID_FILES.length, 0);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-        runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
-        Thread.sleep(1000);
-        runner.run(1, false, false);
-        runner.clearTransferState();
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 2);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testMultiCharDelimiter() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testConnectionPerFlowFile() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER, true);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, VALID_FILES.length);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testConnectionFailure() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-        removeTestServer(server);
-        runner.clearTransferState();
-        sendTestData(VALID_FILES);
-        Thread.sleep(10);
-        checkNoDataReceived(recvQueue);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(recvQueue, VALID_FILES);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testEmptyFile() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(EMPTY_FILE);
-        Thread.sleep(10);
-        checkRelationships(EMPTY_FILE.length, 0);
-        checkEmptyMessageReceived(recvQueue);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testLargeValidFile() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
-        final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
-        sendTestData(testData);
-        checkReceivedAllData(recvQueue, testData);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, testData.length);
-    }
-
-    @Ignore("This test is failing intermittently as documented in NIFI-4288")
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testInvalidIPAddress() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(INVALID_IP_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        Thread.sleep(10);
-        checkRelationships(0, VALID_FILES.length);
-        checkNoDataReceived(recvQueue);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 0);
-    }
-
-    @Ignore("This test is failing intermittently as documented in NIFI-4288")
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testUnknownHostname() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        configureProperties(UNKNOWN_HOST, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(VALID_FILES);
-        Thread.sleep(10);
-        checkRelationships(0, VALID_FILES.length);
-        checkNoDataReceived(recvQueue);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 0);
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testInvalidPort() throws Exception {
-        configureProperties(UNKNOWN_HOST, MIN_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
-        configureProperties(UNKNOWN_HOST, MIN_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
-        configureProperties(UNKNOWN_HOST, MAX_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
-        configureProperties(UNKNOWN_HOST, MAX_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
-    }
-
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testLoadTest() throws Exception {
-        server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
-        Thread.sleep(1000);
-        final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
-        configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
-        sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
-        checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
-        checkInputQueueIsEmpty();
-        checkTotalNumConnections(server, 1);
-    }
-
-    private void checkTotalNumConnections(final TCPTestServer server, final int expectedTotalNumConnections) {
-        assertEquals(expectedTotalNumConnections, server.getTotalNumConnections());
-    }
-
-    public abstract void configureProperties(final String host, final int port, final String outgoingMessageDelimiter, final boolean connectionPerFlowFile,
-                                             final boolean expectValid) throws InitializationException;
-
-    private void sendTestData(final String[] testData) {
-        sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
-    }
-
-    private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
-        runner.setThreadCount(threadCount);
-        for (int i = 0; i < iterations; i++) {
-            for (String item : testData) {
-                runner.enqueue(item.getBytes());
-            }
-            runner.run(testData.length, false, i == 0);
-        }
-    }
-
-    private void checkRelationships(final int successCount, final int failedCount) {
-        runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
-        runner.assertTransferCount(PutTCP.REL_FAILURE, failedCount);
-    }
-
-    private void checkNoDataReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
-        Thread.sleep(DATA_WAIT_PERIOD);
-        assertNull(recvQueue.poll());
-    }
-
-    private void checkEmptyMessageReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
-        Thread.sleep(DATA_WAIT_PERIOD);
-        final List<Byte> message = recvQueue.poll();
-
-        assertNotNull(message);
-        assertEquals(0, message.size());
-    }
-
-    private void checkInputQueueIsEmpty() {
-        runner.assertQueueEmpty();
-    }
-
-    private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData) throws Exception {
-        checkReceivedAllData(recvQueue, sentData, DEFAULT_ITERATIONS);
-    }
-
-    private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
-        // check each sent FlowFile was successfully sent and received.
-        for (int i = 0; i < iterations; i++) {
-            for (String item : sentData) {
-                List<Byte> message = recvQueue.take();
-                assertNotNull(message);
-                Byte[] messageBytes = new Byte[message.size()];
-                assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
-            }
-        }
-
-        runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
-        runner.clearTransferState();
-
-        // Check that we have no unexpected extra data.
-        assertNull(recvQueue.poll());
-    }
-
-    private String[] createContent(final int size) {
-        final char[] content = new char[size];
-
-        for (int i = 0; i < size; i++) {
-            content[i] = CONTENT_CHAR;
-        }
-
-        return new String[] { new String(content) };
-    }
-}