You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/03/16 22:40:39 UTC
[nifi] branch main updated: NIFI-8304 This closes #4900. Improved
Socket test reliability for several Processors
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2ad88bb NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors
2ad88bb is described below
commit 2ad88bbffff3c90bca7404d10ebc40953c1b3c63
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Mar 15 14:04:30 2021 -0500
NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors
- Refactored TestPutTCP to single class
- Improved TestListenRELP
- Improved TestListenTCP
- Improved TestListenUDP
- Improved TestListenTCPRecord
- Changed OnUnscheduled to OnStopped in AbstractListenEventProcessor
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../util/listen/AbstractListenEventProcessor.java | 6 +-
.../nifi/processors/standard/ListenTCPRecord.java | 11 +-
.../nifi/processors/standard/TestListenRELP.java | 201 ++++++------
.../nifi/processors/standard/TestListenTCP.java | 114 +++----
.../processors/standard/TestListenTCPRecord.java | 164 +++-------
.../nifi/processors/standard/TestListenUDP.java | 108 ++-----
.../nifi/processors/standard/TestPutTCP.java | 293 +++++++++++++++++-
.../nifi/processors/standard/TestPutTcpSSL.java | 82 -----
.../processors/standard/util/TestPutTCPCommon.java | 339 ---------------------
9 files changed, 483 insertions(+), 835 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index edba61a..7898319 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processor.util.listen;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
@@ -220,8 +220,8 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
return events == null ? 0 : events.size();
}
- @OnUnscheduled
- public void onUnscheduled() {
+ @OnStopped
+ public void closeDispatcher() {
if (dispatcher != null) {
dispatcher.close();
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
index 0187961..acc936a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
@@ -46,7 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -299,8 +299,8 @@ public class ListenTCPRecord extends AbstractProcessor {
readerThread.start();
}
- @OnUnscheduled
- public void onUnscheduled() {
+ @OnStopped
+ public void onStopped() {
if (dispatcher != null) {
dispatcher.close();
dispatcher = null;
@@ -460,9 +460,4 @@ public class ListenTCPRecord extends AbstractProcessor {
private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
}
-
- public final int getDispatcherPort() {
- return dispatcher == null ? 0 : dispatcher.getPort();
- }
-
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index e48fe76..7e95433 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
@@ -24,21 +25,21 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
-import org.apache.commons.io.IOUtils;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -47,8 +48,12 @@ import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class TestListenRELP {
public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
@@ -75,82 +80,83 @@ public class TestListenRELP {
.data(new byte[0])
.build();
+ private static final String LOCALHOST = "localhost";
+
+ @Mock
+ private ChannelResponder<SocketChannel> responder;
+
+ @Mock
+ private ChannelDispatcher channelDispatcher;
+
+ @Mock
+ private RestrictedSSLContextService sslContextService;
+
private RELPEncoder encoder;
- private ResponseCapturingListenRELP proc;
+
private TestRunner runner;
@Before
public void setup() {
encoder = new RELPEncoder(StandardCharsets.UTF_8);
- proc = new ResponseCapturingListenRELP();
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(ListenRELP.PORT, "0");
+ runner = TestRunners.newTestRunner(ListenRELP.class);
}
@Test
- public void testListenRELP() throws IOException, InterruptedException {
- final List<RELPFrame> frames = new ArrayList<>();
- frames.add(OPEN_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(CLOSE_FRAME);
+ public void testRun() throws IOException {
+ final int syslogFrames = 5;
+ final List<RELPFrame> frames = getFrames(syslogFrames);
// three syslog frames should be transferred and three responses should be sent
- run(frames, 3, 3, null);
+ run(frames, syslogFrames, syslogFrames, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
- Assert.assertEquals(3, events.size());
+ Assert.assertEquals(syslogFrames, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
- Assert.assertEquals(3, mockFlowFiles.size());
+ Assert.assertEquals(syslogFrames, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
- Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
- Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+ Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+ Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
}
@Test
- public void testBatching() throws IOException, InterruptedException {
+ public void testRunBatching() throws IOException {
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
- final List<RELPFrame> frames = new ArrayList<>();
- frames.add(OPEN_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(CLOSE_FRAME);
+ final int syslogFrames = 3;
+ final List<RELPFrame> frames = getFrames(syslogFrames);
// one syslog frame should be transferred since we are batching, but three responses should be sent
- run(frames, 1, 3, null);
+ final int expectedFlowFiles = 1;
+ run(frames, expectedFlowFiles, syslogFrames, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events);
- Assert.assertEquals(1, events.size());
+ Assert.assertEquals(expectedFlowFiles, events.size());
final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
- Assert.assertEquals(1, mockFlowFiles.size());
+ Assert.assertEquals(expectedFlowFiles, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
- Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
- Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+ Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+ Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
}
@Test
- public void testMutualTls() throws IOException, InterruptedException, TlsException, InitializationException {
- final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
+ public void testRunMutualTls() throws IOException, TlsException, InitializationException {
final String serviceIdentifier = SSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
@@ -160,33 +166,26 @@ public class TestListenRELP {
runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
- final List<RELPFrame> frames = new ArrayList<>();
- frames.add(OPEN_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(SYSLOG_FRAME);
- frames.add(CLOSE_FRAME);
-
- run(frames, 5, 5, sslContext);
+ final int syslogFrames = 3;
+ final List<RELPFrame> frames = getFrames(syslogFrames);
+ run(frames, syslogFrames, syslogFrames, sslContext);
}
@Test
- public void testNoEventsAvailable() throws IOException, InterruptedException {
- MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<RELPEvent>());
+ public void testRunNoEventsAvailable() {
+ MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
runner = TestRunners.newTestRunner(mockListenRELP);
- runner.setProperty(ListenRELP.PORT, "1");
+ runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
+ runner.shutdown();
}
@Test
- public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
+ public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
- final ChannelResponder<SocketChannel> responder = Mockito.mock(ChannelResponder.class);
final List<RELPEvent> mockEvents = new ArrayList<>();
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
@@ -196,96 +195,68 @@ public class TestListenRELP {
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP);
- runner.setProperty(ListenRELP.PORT, "1");
+ runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+ runner.shutdown();
}
+ private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
+ throws IOException {
- protected void run(final List<RELPFrame> frames, final int expectedTransferred, final int expectedResponses, final SSLContext sslContext)
- throws IOException, InterruptedException {
-
- Socket socket = null;
- try {
- // schedule to start listening on a random port
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- proc.onScheduled(context);
-
- // create a client connection to the port the dispatcher is listening on
- final int realPort = proc.getDispatcherPort();
-
- // create either a regular socket or ssl socket based on context being passed in
- if (sslContext == null) {
- socket = new Socket("localhost", realPort);
- } else {
- socket = sslContext.getSocketFactory().createSocket("localhost", realPort);
- }
- Thread.sleep(100);
-
- // send the frames to the port the processors is listening on
- sendFrames(frames, socket);
-
- long responseTimeout = 30000;
-
- // this first loop waits until the internal queue of the processor has the expected
- // number of messages ready before proceeding, we want to guarantee they are all there
- // before onTrigger gets a chance to run
- long startTimeQueueSizeCheck = System.currentTimeMillis();
- while (proc.getQueueSize() < expectedResponses
- && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
- Thread.sleep(100);
- }
-
- // want to fail here if the queue size isn't what we expect
- Assert.assertEquals(expectedResponses, proc.getQueueSize());
+ final int port = NetworkUtils.availablePort();
+ runner.setProperty(ListenRELP.PORT, Integer.toString(port));
- // call onTrigger until we got a respond for all the frames, or a certain amount of time passes
- long startTimeProcessing = System.currentTimeMillis();
- while (proc.responses.size() < expectedResponses
- && (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) {
- proc.onTrigger(context, processSessionFactory);
- Thread.sleep(100);
- }
+ // Run Processor and start Dispatcher without shutting down
+ runner.run(1, false, true);
- // should have gotten a response for each frame
- Assert.assertEquals(expectedResponses, proc.responses.size());
+ try (final Socket socket = getSocket(port, sslContext)) {
+ final OutputStream outputStream = socket.getOutputStream();
+ sendFrames(frames, outputStream);
- // should have transferred the expected events
- runner.assertTransferCount(ListenRELP.REL_SUCCESS, expectedTransferred);
+ // Run Processor for number of responses
+ runner.run(responses, false, false);
+ runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
} finally {
- // unschedule to close connections
- proc.onUnscheduled();
- IOUtils.closeQuietly(socket);
+ runner.shutdown();
}
}
- private void sendFrames(final List<RELPFrame> frames, final Socket socket) throws IOException, InterruptedException {
- // send the provided messages
+ private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
for (final RELPFrame frame : frames) {
- byte[] encodedFrame = encoder.encode(frame);
- socket.getOutputStream().write(encodedFrame);
+ final byte[] encodedFrame = encoder.encode(frame);
+ outputStream.write(encodedFrame);
+ outputStream.flush();
}
- socket.getOutputStream().flush();
}
- // Extend ListenRELP so we can use the CapturingSocketChannelResponseDispatcher
- private static class ResponseCapturingListenRELP extends ListenRELP {
+ private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+ final Socket socket;
+ if (sslContext == null) {
+ socket = new Socket(LOCALHOST, port);
+ } else {
+ socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
+ }
+ return socket;
+ }
- private final List<RELPResponse> responses = new ArrayList<>();
+ private List<RELPFrame> getFrames(final int syslogFrames) {
+ final List<RELPFrame> frames = new ArrayList<>();
+ frames.add(OPEN_FRAME);
- @Override
- protected void respond(RELPEvent event, RELPResponse relpResponse) {
- this.responses.add(relpResponse);
- super.respond(event, relpResponse);
+ for (int i = 0; i < syslogFrames; i++) {
+ frames.add(SYSLOG_FRAME);
}
+
+ frames.add(CLOSE_FRAME);
+ return frames;
}
// Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
- private static class MockListenRELP extends ListenRELP {
+ private class MockListenRELP extends ListenRELP {
private final List<RELPEvent> mockEvents;
@@ -301,8 +272,8 @@ public class TestListenRELP {
}
@Override
- protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) throws IOException {
- return Mockito.mock(ChannelDispatcher.class);
+ protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) {
+ return channelDispatcher;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index 4091516..1745002 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -16,9 +16,7 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
@@ -34,24 +32,23 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class TestListenTCP {
- private static final long RESPONSE_TIMEOUT = 10000;
-
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
+ private static final String LOCALHOST = "localhost";
+
private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext;
- private ListenTCP proc;
private TestRunner runner;
@BeforeClass
@@ -62,9 +59,7 @@ public class TestListenTCP {
@Before
public void setup() {
- proc = new ListenTCP();
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(ListenTCP.PORT, "0");
+ runner = TestRunners.newTestRunner(ListenTCP.class);
}
@Test
@@ -81,7 +76,7 @@ public class TestListenTCP {
}
@Test
- public void testListenTCP() throws IOException, InterruptedException {
+ public void testListenTCP() throws IOException {
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
@@ -89,7 +84,7 @@ public class TestListenTCP {
messages.add("This is message 4\n");
messages.add("This is message 5\n");
- runTCP(messages, messages.size(), null);
+ run(messages, messages.size(), null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -98,7 +93,7 @@ public class TestListenTCP {
}
@Test
- public void testListenTCPBatching() throws IOException, InterruptedException {
+ public void testListenTCPBatching() throws IOException {
runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
final List<String> messages = new ArrayList<>();
@@ -108,7 +103,7 @@ public class TestListenTCP {
messages.add("This is message 4\n");
messages.add("This is message 5\n");
- runTCP(messages, 2, null);
+ run(messages, 2, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
@@ -120,9 +115,7 @@ public class TestListenTCP {
}
@Test
- public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InterruptedException,
- InitializationException {
-
+ public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
@@ -133,8 +126,7 @@ public class TestListenTCP {
messages.add("This is message 4\n");
messages.add("This is message 5\n");
- // Make an SSLContext with a key and trust store to send the test messages
- runTCP(messages, messages.size(), keyStoreSslContext);
+ run(messages, messages.size(), keyStoreSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -154,14 +146,13 @@ public class TestListenTCP {
messages.add("This is message 4\n");
messages.add("This is message 5\n");
- // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
Assert.assertThrows(IOException.class, () ->
- runTCP(messages, messages.size(), trustStoreSslContext)
+ run(messages, messages.size(), trustStoreSslContext)
);
}
@Test
- public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InterruptedException, InitializationException {
+ public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext);
@@ -172,8 +163,7 @@ public class TestListenTCP {
messages.add("This is message 4\n");
messages.add("This is message 5\n");
- // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
- runTCP(messages, messages.size(), trustStoreSslContext);
+ run(messages, messages.size(), trustStoreSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -181,63 +171,39 @@ public class TestListenTCP {
}
}
- protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
- throws IOException, InterruptedException {
-
- Socket socket = null;
- try {
- // schedule to start listening on a random port
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- proc.onScheduled(context);
-
- // create a client connection to the port the dispatcher is listening on
- final int realPort = proc.getDispatcherPort();
-
- // create either a regular socket or ssl socket based on context being passed in
- if (sslContext == null) {
- socket = new Socket("localhost", realPort);
- } else {
- final SocketFactory socketFactory = sslContext.getSocketFactory();
- socket = socketFactory.createSocket("localhost", realPort);
- }
- Thread.sleep(100);
+ protected void run(final List<String> messages, final int flowFiles, final SSLContext sslContext)
+ throws IOException {
- // send the frames to the port the processors is listening on
- for (final String message : messages) {
- socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
- Thread.sleep(1);
- }
- socket.getOutputStream().flush();
-
- // this first loop waits until the internal queue of the processor has the expected
- // number of messages ready before proceeding, we want to guarantee they are all there
- // before onTrigger gets a chance to run
- long startTimeQueueSizeCheck = System.currentTimeMillis();
- while (proc.getQueueSize() < messages.size()
- && (System.currentTimeMillis() - startTimeQueueSizeCheck < RESPONSE_TIMEOUT)) {
- Thread.sleep(100);
- }
+ final int port = NetworkUtils.availablePort();
+ runner.setProperty(ListenTCP.PORT, Integer.toString(port));
- // want to fail here if the queue size isn't what we expect
- Assert.assertEquals(messages.size(), proc.getQueueSize());
+ // Run Processor and start Dispatcher without shutting down
+ runner.run(1, false, true);
- // call onTrigger until we processed all the frames, or a certain amount of time passes
- int numTransferred = 0;
- long startTime = System.currentTimeMillis();
- while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < RESPONSE_TIMEOUT)) {
- proc.onTrigger(context, processSessionFactory);
- numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size();
- Thread.sleep(100);
+ try (final Socket socket = getSocket(port, sslContext)) {
+ final OutputStream outputStream = socket.getOutputStream();
+ for (final String message : messages) {
+ outputStream.write(message.getBytes(StandardCharsets.UTF_8));
}
+ outputStream.flush();
+
+ // Run Processor for number of responses
+ runner.run(flowFiles, false, false);
- // should have transferred the expected events
- runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred);
+ runner.assertTransferCount(ListenTCP.REL_SUCCESS, flowFiles);
} finally {
- // unschedule to close connections
- proc.onUnscheduled();
- IOUtils.closeQuietly(socket);
+ runner.shutdown();
+ }
+ }
+
+ private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+ final Socket socket;
+ if (sslContext == null) {
+ socket = new Socket(LOCALHOST, port);
+ } else {
+ socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
}
+ return socket;
}
private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index e0d1a15..e2e5c74 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -16,18 +16,17 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.Closeable;
import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
-import org.apache.commons.io.IOUtils;
+
import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.ClientAuth;
@@ -46,12 +45,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class TestListenTCPRecord {
- static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
-
static final String SCHEMA_TEXT = "{\n" +
" \"name\": \"syslogRecord\",\n" +
" \"namespace\": \"nifi\",\n" +
@@ -63,17 +58,13 @@ public class TestListenTCPRecord {
" ]\n" +
"}";
- static final List<String> DATA;
+ static final String DATA = "[" +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
+ "]";
- static {
- final List<String> data = new ArrayList<>();
- data.add("[");
- data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},");
- data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},");
- data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}");
- data.add("]");
- DATA = Collections.unmodifiableList(data);
- }
+ private static final String LOCALHOST = "localhost";
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
@@ -81,7 +72,6 @@ public class TestListenTCPRecord {
private static SSLContext trustStoreSslContext;
- private ListenTCPRecord proc;
private TestRunner runner;
@BeforeClass
@@ -92,9 +82,7 @@ public class TestListenTCPRecord {
@Before
public void setup() throws InitializationException {
- proc = new ListenTCPRecord();
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(ListenTCPRecord.PORT, "0");
+ runner = TestRunners.newTestRunner(ListenTCPRecord.class);
final String readerId = "record-reader";
final RecordReaderFactory readerFactory = new JsonTreeReader();
@@ -110,7 +98,6 @@ public class TestListenTCPRecord {
runner.setProperty(ListenTCPRecord.RECORD_READER, readerId);
runner.setProperty(ListenTCPRecord.RECORD_WRITER, writerId);
-
}
@Test
@@ -130,7 +117,7 @@ public class TestListenTCPRecord {
public void testOneRecordPerFlowFile() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
- runTCP(DATA, 3, null);
+ run(3, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) {
@@ -147,7 +134,7 @@ public class TestListenTCPRecord {
public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
- runTCP(DATA, 1, null);
+ run(1, null);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
@@ -167,7 +154,7 @@ public class TestListenTCPRecord {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
- runTCP(DATA, 1, keyStoreSslContext);
+ run(1, keyStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
@@ -180,21 +167,11 @@ public class TestListenTCPRecord {
}
@Test
- public void testTLSClientAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
- runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
- runner.setProperty(ListenTCPRecord.READ_TIMEOUT, "5 seconds");
- enableSslContextService(keyStoreSslContext);
-
- runTCP(DATA, 0, trustStoreSslContext);
- }
-
- @Test
public void testTLSClientAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
-
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext);
- runTCP(DATA, 1, trustStoreSslContext);
+ run(1, trustStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
@@ -206,88 +183,43 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3));
}
- protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
- throws IOException, InterruptedException {
-
- SocketSender sender = null;
- try {
- // schedule to start listening on a random port
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- proc.onScheduled(context);
- Thread.sleep(100);
-
- sender = new SocketSender(proc.getDispatcherPort(), "localhost", sslContext, messages, 0);
-
- final Thread senderThread = new Thread(sender);
- senderThread.setDaemon(true);
- senderThread.start();
-
- long timeout = 10000;
-
- // call onTrigger until we processed all the records, or a certain amount of time passes
- int numTransferred = 0;
- long startTime = System.currentTimeMillis();
- while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < timeout)) {
- proc.onTrigger(context, processSessionFactory);
- numTransferred = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
- Thread.sleep(100);
+ protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
+ final int port = NetworkUtils.availablePort();
+ runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
+
+ // Run Processor and start listener without shutting down
+ runner.run(1, false, true);
+
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ final Thread thread = new Thread(() -> {
+ try (final Socket socket = getSocket(port, sslContext)) {
+ final OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ completed.set(true);
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
}
+ });
+ thread.start();
- // should have transferred the expected events
- runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
- } finally {
- // unschedule to close connections
- proc.onUnscheduled();
- IOUtils.closeQuietly(sender);
- }
- }
-
- private static class SocketSender implements Runnable, Closeable {
-
- private final int port;
- private final String host;
- private final SSLContext sslContext;
- private final List<String> data;
- private final long delay;
+ // Wait for Send Completion
+ completed.compareAndSet(true, false);
- private Socket socket;
-
- public SocketSender(final int port, final String host, final SSLContext sslContext, final List<String> data, final long delay) {
- this.port = port;
- this.host = host;
- this.sslContext = sslContext;
- this.data = data;
- this.delay = delay;
- }
-
- @Override
- public void run() {
- try {
- if (sslContext != null) {
- socket = sslContext.getSocketFactory().createSocket(host, port);
- } else {
- socket = new Socket(host, port);
- }
-
- for (final String message : data) {
- socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
- if (delay > 0) {
- Thread.sleep(delay);
- }
- }
-
- socket.getOutputStream().flush();
- } catch (final Exception e) {
- LOGGER.error(e.getMessage(), e);
- } finally {
- IOUtils.closeQuietly(socket);
- }
- }
+ // Run Processor for expected FlowFiles with an additional run to ensure completion
+ final int iterations = expectedTransferred + 1;
+ runner.run(iterations, true, false);
+ runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
+ }
- public void close() {
- IOUtils.closeQuietly(socket);
+ private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
+ final Socket socket;
+ if (sslContext == null) {
+ socket = new Socket(LOCALHOST, port);
+ } else {
+ socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
}
+ return socket;
}
private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index b6c997e..bbbce1f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -16,23 +16,20 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -47,34 +44,21 @@ import java.util.concurrent.BlockingQueue;
public class TestListenUDP {
- private int port = 0;
- private ListenUDP proc;
- private TestRunner runner;
+ private static final String LOCALHOST = "localhost";
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug");
- }
+ private int port = 0;
- @AfterClass
- public static void tearDownAfterClass() {
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "false");
- }
+ private TestRunner runner;
@Before
public void setUp() throws Exception {
- proc = new ListenUDP();
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(ListenUDP.PORT, String.valueOf(port));
+ runner = TestRunners.newTestRunner(ListenUDP.class);
+ port = NetworkUtils.availablePort();
+ runner.setProperty(ListenUDP.PORT, Integer.toString(port));
}
@Test
public void testCustomValidation() {
- runner.assertNotValid();
runner.setProperty(ListenUDP.PORT, "1");
runner.assertValid();
@@ -110,15 +94,13 @@ public class TestListenUDP {
runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize));
final List<String> messages = getMessages(20);
- final int expectedQueued = maxQueueSize;
- final int expectedTransferred = maxQueueSize;
- run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
+ run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
verifyFlowFiles(mockFlowFiles);
- verifyProvenance(expectedTransferred);
+ verifyProvenance(maxQueueSize);
}
@Test
@@ -146,7 +128,7 @@ public class TestListenUDP {
}
@Test
- public void testBatchingWithDifferentSenders() throws IOException, InterruptedException {
+ public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
@@ -164,7 +146,6 @@ public class TestListenUDP {
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
-
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
@@ -172,7 +153,7 @@ public class TestListenUDP {
}
@Test
- public void testRunWhenNoEventsAvailable() throws IOException, InterruptedException {
+ public void testRunWhenNoEventsAvailable() {
final List<StandardEvent> mockEvents = new ArrayList<>();
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
@@ -206,26 +187,6 @@ public class TestListenUDP {
verifyProvenance(expectedTransferred);
}
- @Test
- public void testWithSendingHostAndPortDifferentThanSender() throws IOException, InterruptedException {
- final String sendingHost = "localhost";
- final Integer sendingPort = 21001;
- runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
- runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
-
- // bind to a different sending port than the processor has for Sending Host Port
- final DatagramSocket socket = new DatagramSocket(21002);
-
- // no messages should come through since we are listening for 21001 and sending from 21002
-
- final List<String> messages = getMessages(6);
- final int expectedQueued = 0;
- final int expectedTransferred = 0;
-
- run(socket, messages, expectedQueued, expectedTransferred);
- runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
- }
-
private List<String> getMessages(int numMessages) {
final List<String> messages = new ArrayList<>();
for (int i=0; i < numMessages; i++) {
@@ -256,54 +217,25 @@ public class TestListenUDP {
protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
throws IOException, InterruptedException {
- try {
- // schedule to start listening on a random port
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- proc.onScheduled(context);
- Thread.sleep(100);
- // get the real port the dispatcher is listening on
- final int destPort = proc.getDispatcherPort();
- final InetSocketAddress destination = new InetSocketAddress("localhost", destPort);
- // send the messages to the port the processors is listening on
+ // Run Processor and start Dispatcher without shutting down
+ runner.run(1, false, true);
+
+ try {
+ final InetSocketAddress destination = new InetSocketAddress(LOCALHOST, port);
for (final String message : messages) {
final byte[] buffer = message.getBytes(StandardCharsets.UTF_8);
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination);
socket.send(packet);
- Thread.sleep(10);
}
- long responseTimeout = 10000;
-
- // this first loop waits until the internal queue of the processor has the expected
- // number of messages ready before proceeding, we want to guarantee they are all there
- // before onTrigger gets a chance to run
- long startTimeQueueSizeCheck = System.currentTimeMillis();
- while (proc.getQueueSize() < expectedQueueSize
- && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
- Thread.sleep(100);
- }
-
- // want to fail here if the queue size isn't what we expect
- Assert.assertEquals(expectedQueueSize, proc.getQueueSize());
-
- // call onTrigger until we processed all the messages, or a certain amount of time passes
- int numTransferred = 0;
- long startTime = System.currentTimeMillis();
- while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) {
- proc.onTrigger(context, processSessionFactory);
- numTransferred = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS).size();
- Thread.sleep(100);
- }
+ // Run Processor for number of responses
+ runner.run(expectedTransferred, false, false);
- // should have transferred the expected events
runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred);
} finally {
- // unschedule to close connections
- proc.onUnscheduled();
- IOUtils.closeQuietly(socket);
+ runner.shutdown();
}
}
@@ -324,7 +256,7 @@ public class TestListenUDP {
}
@Override
- protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
+ protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
return Mockito.mock(ChannelDispatcher.class);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index e520711..d6c806f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -14,18 +14,246 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.processors.standard;
-import org.apache.nifi.processors.standard.util.TestPutTCPCommon;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.nifi.processors.standard.util.TCPTestServer;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestPutTCP {
+ private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
+ private final static String SERVER_VARIABLE = "server.address";
+ private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
+ private final static int MIN_INVALID_PORT = 0;
+ private final static int MIN_VALID_PORT = 1;
+ private final static int MAX_VALID_PORT = 65535;
+ private final static int MAX_INVALID_PORT = 65536;
+ private final static int BUFFER_SIZE = 1024;
+ private final static int VALID_LARGE_FILE_SIZE = 32768;
+ private final static int VALID_SMALL_FILE_SIZE = 64;
+ private final static int LOAD_TEST_ITERATIONS = 500;
+ private final static int LOAD_TEST_THREAD_COUNT = 1;
+ private final static int DEFAULT_ITERATIONS = 1;
+ private final static int DEFAULT_THREAD_COUNT = 1;
+ private final static char CONTENT_CHAR = 'x';
+ private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
+ private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
+ private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
+ private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
+ private final static String[] EMPTY_FILE = { "" };
+ private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
+
+ private TCPTestServer server;
+ private int port;
+ private ArrayBlockingQueue<List<Byte>> received;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws Exception {
+ received = new ArrayBlockingQueue<>(BUFFER_SIZE);
+ runner = TestRunners.newTestRunner(PutTCP.class);
+ runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
+ }
+
+ @After
+ public void cleanup() {
+ runner.shutdown();
+ removeTestServer(server);
+ }
+
+ @Test
+ public void testPortProperty() {
+ runner.setProperty(PutTCP.PORT, Integer.toString(MIN_INVALID_PORT));
+ runner.assertNotValid();
+
+ runner.setProperty(PutTCP.PORT, Integer.toString(MIN_VALID_PORT));
+ runner.assertValid();
+
+ runner.setProperty(PutTCP.PORT, Integer.toString(MAX_VALID_PORT));
+ runner.assertValid();
+
+ runner.setProperty(PutTCP.PORT, Integer.toString(MAX_INVALID_PORT));
+ runner.assertNotValid();
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccess() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ }
-public class TestPutTCP extends TestPutTCPCommon {
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessSslContextService() throws Exception {
+ final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
+
+ try {
+ final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
+ assertNotNull("SSLContext not found", sslContext);
+
+ final String identifier = SSLContextService.class.getName();
+ final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
+ Mockito.when(sslContextService.getIdentifier()).thenReturn(identifier);
+ Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+ runner.addControllerService(identifier, sslContextService);
+ runner.enableControllerService(sslContextService);
+ runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
+
+ final SSLServerSocketFactory serverSocketFactory = sslContext.getServerSocketFactory();
+ createTestServer(OUTGOING_MESSAGE_DELIMITER, false, serverSocketFactory);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ } finally {
+ Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
+ Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
+ }
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessServerVariableExpression() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessPruneSenders() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertTransfers(VALID_FILES.length);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
+ Thread.sleep(1000);
+ runner.run(1, false, false);
+ runner.clearTransferState();
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections after prune senders not matched", server.getTotalNumConnections(), 2);
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessMultiCharDelimiter() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ }
+
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessConnectionPerFlowFile() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), VALID_FILES.length);
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessConnectionFailure() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ removeTestServer(server);
+ runner.clearTransferState();
+ sendTestData(VALID_FILES);
+ Thread.sleep(500);
+ assertNull("Unexpected Data Received", received.poll());
+ runner.assertQueueEmpty();
+ assertEquals("Server Connections after restart not matched", server.getTotalNumConnections(), 1);
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ }
- public TestPutTCP() {
- super();
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessEmptyFile() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(EMPTY_FILE);
+ assertTransfers(EMPTY_FILE.length);
+ runner.assertQueueEmpty();
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
- @Override
- public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) {
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessLargeValidFile() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
+ final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
+ sendTestData(testData);
+ assertMessagesReceived(testData);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), testData.length);
+ }
+
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ public void testRunSuccessFiveHundredMessages() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ Thread.sleep(1000);
+ final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+ sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
+ assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
+ assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
+ }
+
+ private void createTestServer(final String delimiter) throws Exception {
+ createTestServer(delimiter, false);
+ }
+
+ private void createTestServer(final String delimiter, final boolean closeOnMessageReceived) throws Exception {
+ createTestServer(delimiter, closeOnMessageReceived, ServerSocketFactory.getDefault());
+ }
+
+ private void createTestServer(final String delimiter, final boolean closeOnMessageReceived, final ServerSocketFactory serverSocketFactory) throws Exception {
+ server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), received, delimiter, closeOnMessageReceived);
+ server.startServer(serverSocketFactory);
+ port = server.getPort();
+ }
+
+ private void removeTestServer(final TCPTestServer server) {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+
+ private void configureProperties(String host, String outgoingMessageDelimiter, boolean connectionPerFlowFile) {
runner.setProperty(PutTCP.HOSTNAME, host);
runner.setProperty(PutTCP.PORT, Integer.toString(port));
if (outgoingMessageDelimiter != null) {
@@ -33,11 +261,56 @@ public class TestPutTCP extends TestPutTCPCommon {
}
runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
+ runner.assertValid();
+ }
+
+ private void sendTestData(final String[] testData) {
+ sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
+ }
+
+ private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
+ runner.setThreadCount(threadCount);
+ for (int i = 0; i < iterations; i++) {
+ for (String item : testData) {
+ runner.enqueue(item.getBytes());
+ }
+ runner.run(testData.length, false, i == 0);
+ }
+ }
+
+ private void assertTransfers(final int successCount) {
+ runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
+ runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
+ }
+
+ private void assertMessagesReceived(final String[] sentData) throws Exception {
+ assertMessagesReceived(sentData, DEFAULT_ITERATIONS);
+ runner.assertQueueEmpty();
+ }
- if (expectValid) {
- runner.assertValid();
- } else {
- runner.assertNotValid();
+ private void assertMessagesReceived(final String[] sentData, final int iterations) throws Exception {
+ for (int i = 0; i < iterations; i++) {
+ for (String item : sentData) {
+ List<Byte> message = received.take();
+ assertNotNull(String.format("Message [%d] not found", i), message);
+ Byte[] messageBytes = new Byte[message.size()];
+ assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
+ }
}
+
+ runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
+ runner.clearTransferState();
+
+ assertNull("Unexpected Message Found", received.poll());
+ }
+
+ private String[] createContent(final int size) {
+ final char[] content = new char[size];
+
+ for (int i = 0; i < size; i++) {
+ content[i] = CONTENT_CHAR;
+ }
+
+ return new String[] { new String(content) };
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java
deleted file mode 100644
index 6870ae9..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTcpSSL.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.processors.standard.util.TestPutTCPCommon;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.security.util.KeystoreType;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.security.util.StandardTlsConfiguration;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.apache.nifi.security.util.TlsException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import javax.net.ssl.SSLContext;
-
-public class TestPutTcpSSL extends TestPutTCPCommon {
- private static final String TLS_PROTOCOL = "TLSv1.2";
-
- private static SSLContext sslContext;
-
- @BeforeClass
- public static void configureServices() throws TlsException {
- final TlsConfiguration configuration = new StandardTlsConfiguration(
- "src/test/resources/keystore.jks",
- "passwordpassword",
- "passwordpassword",
- KeystoreType.JKS,
- "src/test/resources/truststore.jks",
- "passwordpassword",
- KeystoreType.JKS,
- TLS_PROTOCOL
- );
- sslContext = SslContextFactory.createSslContext(configuration);
- }
-
- public TestPutTcpSSL() {
- super();
- serverSocketFactory = sslContext.getServerSocketFactory();
- }
-
- @Override
- public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) throws InitializationException {
- runner.setProperty(PutTCP.HOSTNAME, host);
- runner.setProperty(PutTCP.PORT, Integer.toString(port));
-
- final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
- final String serviceIdentifier = SSLContextService.class.getName();
- Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
- Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
-
- runner.addControllerService(serviceIdentifier, sslContextService);
- runner.enableControllerService(sslContextService);
- runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, serviceIdentifier);
-
- if (outgoingMessageDelimiter != null) {
- runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
- }
- runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
-
- if (expectValid) {
- runner.assertValid();
- } else {
- runner.assertNotValid();
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
deleted file mode 100644
index 3f94087..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.nifi.processors.standard.PutTCP;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import javax.net.ServerSocketFactory;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public abstract class TestPutTCPCommon {
- private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
- private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
- private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
- private final static String UNKNOWN_HOST = "fgdsfgsdffd";
- private final static String INVALID_IP_ADDRESS = "300.300.300.300";
- private final static int MIN_INVALID_PORT = 0;
- private final static int MIN_VALID_PORT = 1;
- private final static int MAX_VALID_PORT = 65535;
- private final static int MAX_INVALID_PORT = 65536;
- private final static int BUFFER_SIZE = 1024;
- private final static int VALID_LARGE_FILE_SIZE = 32768;
- private final static int VALID_SMALL_FILE_SIZE = 64;
- private final static int LOAD_TEST_ITERATIONS = 500;
- private final static int LOAD_TEST_THREAD_COUNT = 1;
- private final static int DEFAULT_ITERATIONS = 1;
- private final static int DEFAULT_THREAD_COUNT = 1;
- private final static char CONTENT_CHAR = 'x';
- private final static int DATA_WAIT_PERIOD = 1000;
- private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
- private final static int LONG_TEST_TIMEOUT_PERIOD = 180000;
- private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
- private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
-
- private TCPTestServer server;
- private int port;
- private ArrayBlockingQueue<List<Byte>> recvQueue;
-
- public ServerSocketFactory serverSocketFactory;
- public TestRunner runner;
-
- // Test Data
- private final static String[] EMPTY_FILE = { "" };
- private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
-
- @BeforeClass
- public static void setUpSuite() {
- Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
- }
-
- @Before
- public void setup() throws Exception {
- recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
- runner = TestRunners.newTestRunner(PutTCP.class);
- runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
- }
-
- private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter, final boolean closeOnMessageReceived) throws Exception {
- TCPTestServer server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), queue, delimiter, closeOnMessageReceived);
- server.startServer(serverSocketFactory);
- port = server.getPort();
- return server;
- }
-
- private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter) throws Exception {
- return createTestServer(queue, delimiter, false);
- }
-
- @After
- public void cleanup() {
- runner.shutdown();
- removeTestServer(server);
- }
-
- private void removeTestServer(TCPTestServer server) {
- if (server != null) {
- server.shutdown();
- }
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testValidFiles() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testValidFilesEL() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS_EL, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testPruneSenders() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- Thread.sleep(10);
- checkRelationships(VALID_FILES.length, 0);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
- Thread.sleep(1000);
- runner.run(1, false, false);
- runner.clearTransferState();
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 2);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testMultiCharDelimiter() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testConnectionPerFlowFile() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER, true);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, VALID_FILES.length);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testConnectionFailure() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- removeTestServer(server);
- runner.clearTransferState();
- sendTestData(VALID_FILES);
- Thread.sleep(10);
- checkNoDataReceived(recvQueue);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- checkReceivedAllData(recvQueue, VALID_FILES);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testEmptyFile() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(EMPTY_FILE);
- Thread.sleep(10);
- checkRelationships(EMPTY_FILE.length, 0);
- checkEmptyMessageReceived(recvQueue);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testLargeValidFile() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
- final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
- sendTestData(testData);
- checkReceivedAllData(recvQueue, testData);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, testData.length);
- }
-
- @Ignore("This test is failing intermittently as documented in NIFI-4288")
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testInvalidIPAddress() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(INVALID_IP_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- Thread.sleep(10);
- checkRelationships(0, VALID_FILES.length);
- checkNoDataReceived(recvQueue);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 0);
- }
-
- @Ignore("This test is failing intermittently as documented in NIFI-4288")
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testUnknownHostname() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(UNKNOWN_HOST, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(VALID_FILES);
- Thread.sleep(10);
- checkRelationships(0, VALID_FILES.length);
- checkNoDataReceived(recvQueue);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 0);
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testInvalidPort() throws Exception {
- configureProperties(UNKNOWN_HOST, MIN_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
- configureProperties(UNKNOWN_HOST, MIN_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
- configureProperties(UNKNOWN_HOST, MAX_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
- configureProperties(UNKNOWN_HOST, MAX_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
- }
-
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testLoadTest() throws Exception {
- server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
- Thread.sleep(1000);
- final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
- configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
- sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
- checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
- checkInputQueueIsEmpty();
- checkTotalNumConnections(server, 1);
- }
-
- private void checkTotalNumConnections(final TCPTestServer server, final int expectedTotalNumConnections) {
- assertEquals(expectedTotalNumConnections, server.getTotalNumConnections());
- }
-
- public abstract void configureProperties(final String host, final int port, final String outgoingMessageDelimiter, final boolean connectionPerFlowFile,
- final boolean expectValid) throws InitializationException;
-
- private void sendTestData(final String[] testData) {
- sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
- }
-
- private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
- runner.setThreadCount(threadCount);
- for (int i = 0; i < iterations; i++) {
- for (String item : testData) {
- runner.enqueue(item.getBytes());
- }
- runner.run(testData.length, false, i == 0);
- }
- }
-
- private void checkRelationships(final int successCount, final int failedCount) {
- runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
- runner.assertTransferCount(PutTCP.REL_FAILURE, failedCount);
- }
-
- private void checkNoDataReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
- Thread.sleep(DATA_WAIT_PERIOD);
- assertNull(recvQueue.poll());
- }
-
- private void checkEmptyMessageReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
- Thread.sleep(DATA_WAIT_PERIOD);
- final List<Byte> message = recvQueue.poll();
-
- assertNotNull(message);
- assertEquals(0, message.size());
- }
-
- private void checkInputQueueIsEmpty() {
- runner.assertQueueEmpty();
- }
-
- private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData) throws Exception {
- checkReceivedAllData(recvQueue, sentData, DEFAULT_ITERATIONS);
- }
-
- private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
- // check each sent FlowFile was successfully sent and received.
- for (int i = 0; i < iterations; i++) {
- for (String item : sentData) {
- List<Byte> message = recvQueue.take();
- assertNotNull(message);
- Byte[] messageBytes = new Byte[message.size()];
- assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
- }
- }
-
- runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
- runner.clearTransferState();
-
- // Check that we have no unexpected extra data.
- assertNull(recvQueue.poll());
- }
-
- private String[] createContent(final int size) {
- final char[] content = new char[size];
-
- for (int i = 0; i < size; i++) {
- content[i] = CONTENT_CHAR;
- }
-
- return new String[] { new String(content) };
- }
-}