You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/21 15:00:18 UTC
[nifi] 04/06: NIFI-9942 Removed load test from TestPutUDP
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 6596d05144a27d9db07c26365d414fc1675b7d44
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Apr 20 15:11:26 2022 -0500
NIFI-9942 Removed load test from TestPutUDP
- Refactored TestPutUDP using JUnit 5
- Standardized test method naming
This closes #5984
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../nifi/processors/standard/TestPutUDP.java | 205 ++++++++-------------
1 file changed, 79 insertions(+), 126 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
index bb8a1029cf..a5ea9995be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
@@ -26,9 +26,10 @@ import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.net.InetAddress;
import java.nio.charset.Charset;
@@ -36,41 +37,31 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+@Timeout(10)
public class TestPutUDP {
private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
- private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
- private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
+ private final static String SERVER_VARIABLE = "SERVER";
private static final String DELIMITER = "\n";
private static final Charset CHARSET = StandardCharsets.UTF_8;
private final static int MAX_FRAME_LENGTH = 32800;
private final static int VALID_LARGE_FILE_SIZE = 32768;
- private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int INVALID_LARGE_FILE_SIZE = 1_000_000;
- private final static int LOAD_TEST_ITERATIONS = 500;
- private final static int LOAD_TEST_THREAD_COUNT = 1;
- private final static int DEFAULT_ITERATIONS = 1;
- private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
private final static int DATA_WAIT_PERIOD = 50;
- private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
- private final static int LONG_TEST_TIMEOUT_PERIOD = 30000;
+ private final static String[] EMPTY_FILE = { "" };
+ private final static String[] VALID_FILES = { "FIRST", "SECOND", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
private TestRunner runner;
private int port;
private EventServer eventServer;
private BlockingQueue<ByteArrayMessage> messages;
-
- // Test Data
- private final static String[] EMPTY_FILE = { "" };
- private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
-
- @Before
+ @BeforeEach
public void setup() throws Exception {
runner = TestRunners.newTestRunner(PutUDP.class);
runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
@@ -78,100 +69,64 @@ public class TestPutUDP {
createTestServer(port, VALID_LARGE_FILE_SIZE);
}
- private void createTestServer(final int port, final int frameSize) throws Exception {
- messages = new LinkedBlockingQueue<>();
- final byte[] delimiter = DELIMITER.getBytes(CHARSET);
- final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
- NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
- runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages);
- serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
- serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
- serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
- eventServer = serverFactory.getEventServer();
- }
-
- @After
+ @AfterEach
public void cleanup() {
runner.shutdown();
removeTestServer();
}
- private void removeTestServer() {
- if (eventServer != null) {
- eventServer.shutdown();
- eventServer = null;
- }
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testValidFiles() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(VALID_FILES);
- checkReceivedAllData(VALID_FILES);
- checkInputQueueIsEmpty();
- }
-
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testValidFilesEL() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS_EL);
- sendTestData(VALID_FILES);
- checkReceivedAllData(VALID_FILES);
- checkInputQueueIsEmpty();
+ @Test
+ public void testSend() throws Exception {
+ configureProperties();
+ sendMessages(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ runner.assertQueueEmpty();
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testEmptyFile() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(EMPTY_FILE);
+ @Test
+ public void testSendEmptyFile() throws Exception {
+ configureProperties();
+ sendMessages(EMPTY_FILE);
checkRelationships(EMPTY_FILE.length, 0);
checkNoDataReceived();
- checkInputQueueIsEmpty();
+ runner.assertQueueEmpty();
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testLargeValidFile() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS);
+ @Test
+ public void testSendLargeFile() throws Exception {
+ configureProperties();
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
- sendTestData(testData);
- checkReceivedAllData(testData);
- checkInputQueueIsEmpty();
+ sendMessages(testData);
+ assertMessagesReceived(testData);
+ runner.assertQueueEmpty();
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testLargeInvalidFile() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS);
+ @Test
+ public void testSendLargeFileInvalid() throws Exception {
+ configureProperties();
String[] testData = createContent(INVALID_LARGE_FILE_SIZE);
- sendTestData(testData);
+ sendMessages(testData);
checkRelationships(0, testData.length);
checkNoDataReceived();
- checkInputQueueIsEmpty();
+ runner.assertQueueEmpty();
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testReconfiguration() throws Exception {
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(VALID_FILES);
- checkReceivedAllData(VALID_FILES);
+ @Test
+ public void testSendChangePropertiesAndSend() throws Exception {
+ configureProperties();
+ sendMessages(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
reset(port);
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(VALID_FILES);
- checkReceivedAllData(VALID_FILES);
+ configureProperties();
+ sendMessages(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
reset(port);
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(VALID_FILES);
- checkReceivedAllData(VALID_FILES);
- checkInputQueueIsEmpty();
- }
-
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testLoadTest() throws Exception {
- final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
- configureProperties(UDP_SERVER_ADDRESS);
- sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
- checkReceivedAllData(testData, LOAD_TEST_ITERATIONS);
- checkInputQueueIsEmpty();
+ configureProperties();
+ sendMessages(VALID_FILES);
+ assertMessagesReceived(VALID_FILES);
+ runner.assertQueueEmpty();
}
private void reset(final int port) throws Exception {
@@ -180,28 +135,17 @@ public class TestPutUDP {
createTestServer(port, MAX_FRAME_LENGTH);
}
- private void configureProperties(final String host) {
- runner.setProperty(PutUDP.HOSTNAME, host);
+ private void configureProperties() {
+ runner.setProperty(PutUDP.HOSTNAME, UDP_SERVER_ADDRESS);
runner.setProperty(PutUDP.PORT, Integer.toString(port));
- runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B");
runner.assertValid();
}
- private void sendTestData(final String[] testData) {
- sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
- }
-
- private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
+ private void sendMessages(final String[] testData) {
for (String item : testData) {
- runner.setThreadCount(threadCount);
- for (int i = 0; i < iterations; i++) {
- runner.enqueue(item.getBytes());
- }
- runner.run(iterations, false);
+ runner.enqueue(item.getBytes());
+ runner.run();
}
-
- // ensure @OnStopped methods get called
- runner.run();
}
private void checkRelationships(final int successCount, final int failedCount) {
@@ -211,30 +155,20 @@ public class TestPutUDP {
private void checkNoDataReceived() throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
- assertNull("Unexpected extra messages found", messages.poll());
- }
-
- private void checkInputQueueIsEmpty() {
- runner.assertQueueEmpty();
+ assertNull(messages.poll(), "Unexpected extra messages found");
}
- private void checkReceivedAllData(final String[] sentData) throws Exception {
- checkReceivedAllData(sentData, DEFAULT_ITERATIONS);
- }
-
- private void checkReceivedAllData(final String[] sentData, final int iterations) throws Exception {
+ private void assertMessagesReceived(final String[] sentMessages) throws Exception {
// check each sent FlowFile was successfully sent and received.
- for (String item : sentData) {
- for (int i = 0; i < iterations; i++) {
- ByteArrayMessage packet = messages.take();
- assertNotNull(packet);
- assertArrayEquals(item.getBytes(), packet.getMessage());
- }
+ for (String item : sentMessages) {
+ ByteArrayMessage packet = messages.take();
+ assertNotNull(packet);
+ assertArrayEquals(item.getBytes(), packet.getMessage());
}
- runner.assertTransferCount(PutUDP.REL_SUCCESS, sentData.length * iterations);
+ runner.assertTransferCount(PutUDP.REL_SUCCESS, sentMessages.length);
- assertNull("Unexpected extra messages found", messages.poll());
+ assertNull(messages.poll(), "Unexpected extra messages found");
}
private String[] createContent(final int size) {
@@ -246,4 +180,23 @@ public class TestPutUDP {
return new String[] { new String(content).concat("\n") };
}
+
+ private void createTestServer(final int port, final int frameSize) throws Exception {
+ messages = new LinkedBlockingQueue<>();
+ final byte[] delimiter = DELIMITER.getBytes(CHARSET);
+ final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
+ NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
+ runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages);
+ serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
+ serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+ eventServer = serverFactory.getEventServer();
+ }
+
+ private void removeTestServer() {
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
+ }
+ }
}