You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/21 15:00:18 UTC

[nifi] 04/06: NIFI-9942 Removed load test from TestPutUDP

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 6596d05144a27d9db07c26365d414fc1675b7d44
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Apr 20 15:11:26 2022 -0500

    NIFI-9942 Removed load test from TestPutUDP
    
    - Refactored TestPutUDP using JUnit 5
    - Standardized test method naming
    
    This closes #5984
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../nifi/processors/standard/TestPutUDP.java       | 205 ++++++++-------------
 1 file changed, 79 insertions(+), 126 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
index bb8a1029cf..a5ea9995be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
@@ -26,9 +26,10 @@ import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.net.InetAddress;
 import java.nio.charset.Charset;
@@ -36,41 +37,31 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
+@Timeout(10)
 public class TestPutUDP {
 
     private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
-    private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
-    private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
+    private final static String SERVER_VARIABLE = "SERVER";
     private static final String DELIMITER = "\n";
     private static final Charset CHARSET = StandardCharsets.UTF_8;
     private final static int MAX_FRAME_LENGTH = 32800;
     private final static int VALID_LARGE_FILE_SIZE = 32768;
-    private final static int VALID_SMALL_FILE_SIZE = 64;
     private final static int INVALID_LARGE_FILE_SIZE = 1_000_000;
-    private final static int LOAD_TEST_ITERATIONS = 500;
-    private final static int LOAD_TEST_THREAD_COUNT = 1;
-    private final static int DEFAULT_ITERATIONS = 1;
-    private final static int DEFAULT_THREAD_COUNT = 1;
     private final static char CONTENT_CHAR = 'x';
     private final static int DATA_WAIT_PERIOD = 50;
-    private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
-    private final static int LONG_TEST_TIMEOUT_PERIOD = 30000;
+    private final static String[] EMPTY_FILE = { "" };
+    private final static String[] VALID_FILES = { "FIRST", "SECOND", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
 
     private TestRunner runner;
     private int port;
     private EventServer eventServer;
     private BlockingQueue<ByteArrayMessage> messages;
 
-
-    // Test Data
-    private final static String[] EMPTY_FILE = { "" };
-    private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
-
-    @Before
+    @BeforeEach
     public void setup() throws Exception {
         runner = TestRunners.newTestRunner(PutUDP.class);
         runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
@@ -78,100 +69,64 @@ public class TestPutUDP {
         createTestServer(port, VALID_LARGE_FILE_SIZE);
     }
 
-    private void createTestServer(final int port, final int frameSize) throws Exception {
-        messages = new LinkedBlockingQueue<>();
-        final byte[] delimiter = DELIMITER.getBytes(CHARSET);
-        final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
-        NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
-                runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages);
-        serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
-        serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
-        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
-        eventServer = serverFactory.getEventServer();
-    }
-
-    @After
+    @AfterEach
     public void cleanup() {
         runner.shutdown();
         removeTestServer();
     }
 
-    private void removeTestServer() {
-        if (eventServer != null) {
-            eventServer.shutdown();
-            eventServer = null;
-        }
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testValidFiles() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(VALID_FILES);
-        checkInputQueueIsEmpty();
-    }
-
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testValidFilesEL() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS_EL);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(VALID_FILES);
-        checkInputQueueIsEmpty();
+    @Test
+    public void testSend() throws Exception {
+        configureProperties();
+        sendMessages(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        runner.assertQueueEmpty();
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testEmptyFile() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(EMPTY_FILE);
+    @Test
+    public void testSendEmptyFile() throws Exception {
+        configureProperties();
+        sendMessages(EMPTY_FILE);
         checkRelationships(EMPTY_FILE.length, 0);
         checkNoDataReceived();
-        checkInputQueueIsEmpty();
+        runner.assertQueueEmpty();
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testLargeValidFile() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS);
+    @Test
+    public void testSendLargeFile() throws Exception {
+        configureProperties();
         final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
-        sendTestData(testData);
-        checkReceivedAllData(testData);
-        checkInputQueueIsEmpty();
+        sendMessages(testData);
+        assertMessagesReceived(testData);
+        runner.assertQueueEmpty();
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testLargeInvalidFile() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS);
+    @Test
+    public void testSendLargeFileInvalid() throws Exception {
+        configureProperties();
         String[] testData = createContent(INVALID_LARGE_FILE_SIZE);
-        sendTestData(testData);
+        sendMessages(testData);
         checkRelationships(0, testData.length);
         checkNoDataReceived();
-        checkInputQueueIsEmpty();
+        runner.assertQueueEmpty();
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testReconfiguration() throws Exception {
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(VALID_FILES);
+    @Test
+    public void testSendChangePropertiesAndSend() throws Exception {
+        configureProperties();
+        sendMessages(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
         reset(port);
 
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(VALID_FILES);
+        configureProperties();
+        sendMessages(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
         reset(port);
 
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(VALID_FILES);
-        checkReceivedAllData(VALID_FILES);
-        checkInputQueueIsEmpty();
-    }
-
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testLoadTest() throws Exception {
-        final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
-        configureProperties(UDP_SERVER_ADDRESS);
-        sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
-        checkReceivedAllData(testData, LOAD_TEST_ITERATIONS);
-        checkInputQueueIsEmpty();
+        configureProperties();
+        sendMessages(VALID_FILES);
+        assertMessagesReceived(VALID_FILES);
+        runner.assertQueueEmpty();
     }
 
     private void reset(final int port) throws Exception {
@@ -180,28 +135,17 @@ public class TestPutUDP {
         createTestServer(port, MAX_FRAME_LENGTH);
     }
 
-    private void configureProperties(final String host) {
-        runner.setProperty(PutUDP.HOSTNAME, host);
+    private void configureProperties() {
+        runner.setProperty(PutUDP.HOSTNAME, UDP_SERVER_ADDRESS);
         runner.setProperty(PutUDP.PORT, Integer.toString(port));
-        runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B");
         runner.assertValid();
     }
 
-    private void sendTestData(final String[] testData) {
-        sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
-    }
-
-    private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
+    private void sendMessages(final String[] testData) {
         for (String item : testData) {
-            runner.setThreadCount(threadCount);
-            for (int i = 0; i < iterations; i++) {
-                runner.enqueue(item.getBytes());
-            }
-            runner.run(iterations, false);
+            runner.enqueue(item.getBytes());
+            runner.run();
         }
-
-        // ensure @OnStopped methods get called
-        runner.run();
     }
 
     private void checkRelationships(final int successCount, final int failedCount) {
@@ -211,30 +155,20 @@ public class TestPutUDP {
 
     private void checkNoDataReceived() throws Exception {
         Thread.sleep(DATA_WAIT_PERIOD);
-        assertNull("Unexpected extra messages found", messages.poll());
-    }
-
-    private void checkInputQueueIsEmpty() {
-        runner.assertQueueEmpty();
+        assertNull(messages.poll(), "Unexpected extra messages found");
     }
 
-    private void checkReceivedAllData(final String[] sentData) throws Exception {
-        checkReceivedAllData(sentData, DEFAULT_ITERATIONS);
-    }
-
-    private void checkReceivedAllData(final String[] sentData, final int iterations) throws Exception {
+    private void assertMessagesReceived(final String[] sentMessages) throws Exception {
         // check each sent FlowFile was successfully sent and received.
-         for (String item : sentData) {
-            for (int i = 0; i < iterations; i++) {
-                ByteArrayMessage packet = messages.take();
-                assertNotNull(packet);
-                assertArrayEquals(item.getBytes(), packet.getMessage());
-            }
+         for (String item : sentMessages) {
+             ByteArrayMessage packet = messages.take();
+             assertNotNull(packet);
+             assertArrayEquals(item.getBytes(), packet.getMessage());
         }
 
-        runner.assertTransferCount(PutUDP.REL_SUCCESS, sentData.length * iterations);
+        runner.assertTransferCount(PutUDP.REL_SUCCESS, sentMessages.length);
 
-        assertNull("Unexpected extra messages found", messages.poll());
+        assertNull(messages.poll(), "Unexpected extra messages found");
     }
 
     private String[] createContent(final int size) {
@@ -246,4 +180,23 @@ public class TestPutUDP {
 
         return new String[] { new String(content).concat("\n") };
     }
+
+    private void createTestServer(final int port, final int frameSize) throws Exception {
+        messages = new LinkedBlockingQueue<>();
+        final byte[] delimiter = DELIMITER.getBytes(CHARSET);
+        final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
+        NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
+                runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages);
+        serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
+        serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+        eventServer = serverFactory.getEventServer();
+    }
+
+    private void removeTestServer() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
+        }
+    }
 }