You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/03/23 14:16:07 UTC

[nifi] branch main updated: NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e505e8b  NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating
e505e8b is described below

commit e505e8b42df77034629bb88eebc8e5f7c87fd82d
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Mar 22 15:14:42 2021 -0500

    NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../nifi/processors/standard/TestPutTCP.java       | 37 +++++++++++----------
 .../processors/standard/util/TCPTestServer.java    | 38 +++++++++-------------
 2 files changed, 35 insertions(+), 40 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index f1d8a1e..e608f13 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -27,7 +27,9 @@ import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 import javax.net.ServerSocketFactory;
@@ -36,9 +38,9 @@ import javax.net.ssl.SSLServerSocketFactory;
 import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
@@ -58,13 +60,14 @@ public class TestPutTCP {
     private final static int DEFAULT_ITERATIONS = 1;
     private final static int DEFAULT_THREAD_COUNT = 1;
     private final static char CONTENT_CHAR = 'x';
-    private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
-    private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
     private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
     private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
     private final static String[] EMPTY_FILE = { "" };
     private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
 
+    @Rule
+    public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
+
     private TCPTestServer server;
     private int port;
     private ArrayBlockingQueue<List<Byte>> received;
@@ -98,7 +101,7 @@ public class TestPutTCP {
         runner.assertNotValid();
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccess() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -107,7 +110,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessSslContextService() throws Exception {
         final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
 
@@ -130,7 +133,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessServerVariableExpression() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
@@ -139,7 +142,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessPruneSenders() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -156,7 +159,7 @@ public class TestPutTCP {
         assertServerConnections(2);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessMultiCharDelimiter() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
@@ -165,7 +168,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessConnectionPerFlowFile() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@@ -174,7 +177,7 @@ public class TestPutTCP {
         assertServerConnections(VALID_FILES.length);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessConnectionFailure() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -193,7 +196,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessEmptyFile() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -203,7 +206,7 @@ public class TestPutTCP {
         assertServerConnections(1);
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessLargeValidFile() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@@ -213,7 +216,7 @@ public class TestPutTCP {
         assertServerConnections(testData.length);
     }
 
-    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    @Test
     public void testRunSuccessFiveHundredMessages() throws Exception {
         createTestServer(OUTGOING_MESSAGE_DELIMITER);
         Thread.sleep(1000);
@@ -295,10 +298,10 @@ public class TestPutTCP {
         assertNull("Unexpected Message Found", received.poll());
     }
 
-    private void assertServerConnections(final int connections) {
-        // Shutdown server to get completed number of connections
-        shutdownServer();
-        assertEquals("Server Connections not matched", server.getTotalNumConnections(), connections);
+    private void assertServerConnections(final int connections) throws InterruptedException {
+        while (server.getTotalConnections() != connections) {
+            Thread.sleep(10);
+        }
     }
 
     private String[] createContent(final int size) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
index 05e01ee..2549ffc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
@@ -16,7 +16,8 @@
  */
 package org.apache.nifi.processors.standard.util;
 
-import java.io.IOException;
+import org.apache.nifi.io.socket.SocketUtils;
+
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
@@ -32,7 +33,7 @@ public class TCPTestServer implements Runnable {
     private final InetAddress ipAddress;
     private final String messageDelimiter;
     private final ArrayBlockingQueue<List<Byte>> queue;
-    private final AtomicInteger totalNumConnections = new AtomicInteger();
+    private final AtomicInteger totalConnections = new AtomicInteger();
     private final boolean closeOnMessageReceived;
 
     private volatile ServerSocket serverSocket;
@@ -65,30 +66,22 @@ public class TCPTestServer implements Runnable {
         shutdownServer();
     }
 
-    public synchronized void shutdownServer() {
+    public int getPort(){
+        return port;
+    }
+
+    private synchronized void shutdownServer() {
         if (isServerRunning()) {
-            try {
-                serverSocket.close();
-            } catch (IOException ioe) {
-                // Do Nothing.
-            }
+            SocketUtils.closeQuietly(serverSocket);
         }
     }
 
-    public synchronized void shutdownConnection() {
+    private synchronized void shutdownConnection() {
         if (isConnected()) {
-            try {
-                connectionSocket.close();
-            } catch (IOException ioe) {
-                // Do Nothing.
-            }
+            SocketUtils.closeQuietly(connectionSocket);
         }
     }
 
-    public int getPort(){
-        return port;
-    }
-
     private void storeReceivedMessage(final List<Byte> message) {
         queue.add(message);
         if (closeOnMessageReceived) {
@@ -104,8 +97,8 @@ public class TCPTestServer implements Runnable {
         return connectionSocket != null && !connectionSocket.isClosed();
     }
 
-    public int getTotalNumConnections() {
-        return totalNumConnections.get();
+    public int getTotalConnections() {
+        return totalConnections.get();
     }
 
     protected boolean isDelimiterPresent(final List<Byte> message) {
@@ -140,7 +133,7 @@ public class TCPTestServer implements Runnable {
         try {
             while (isServerRunning()) {
                 connectionSocket = serverSocket.accept();
-                totalNumConnections.incrementAndGet();
+                totalConnections.incrementAndGet();
                 final InputStream inputStream = connectionSocket.getInputStream();
                 while (isConnected()) {
                     final List<Byte> message = new ArrayList<>();
@@ -166,8 +159,7 @@ public class TCPTestServer implements Runnable {
         } catch (Exception e) {
             // Do Nothing
         } finally {
-            shutdownConnection();
-            shutdownServer();
+            shutdown();
         }
 
     }