You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/03/23 14:16:07 UTC
[nifi] branch main updated: NIFI-8352 This closes #4922. Changed
assertServerConnections to loop and sleep while evaluating
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e505e8b NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating
e505e8b is described below
commit e505e8b42df77034629bb88eebc8e5f7c87fd82d
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Mar 22 15:14:42 2021 -0500
NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../nifi/processors/standard/TestPutTCP.java | 37 +++++++++++----------
.../processors/standard/util/TCPTestServer.java | 38 +++++++++-------------
2 files changed, 35 insertions(+), 40 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index f1d8a1e..e608f13 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -27,7 +27,9 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.mockito.Mockito;
import javax.net.ServerSocketFactory;
@@ -36,9 +38,9 @@ import javax.net.ssl.SSLServerSocketFactory;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -58,13 +60,14 @@ public class TestPutTCP {
private final static int DEFAULT_ITERATIONS = 1;
private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
- private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
- private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
+ @Rule
+ public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
+
private TCPTestServer server;
private int port;
private ArrayBlockingQueue<List<Byte>> received;
@@ -98,7 +101,7 @@ public class TestPutTCP {
runner.assertNotValid();
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccess() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -107,7 +110,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessSslContextService() throws Exception {
final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
@@ -130,7 +133,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessServerVariableExpression() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
@@ -139,7 +142,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessPruneSenders() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -156,7 +159,7 @@ public class TestPutTCP {
assertServerConnections(2);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessMultiCharDelimiter() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
@@ -165,7 +168,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessConnectionPerFlowFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@@ -174,7 +177,7 @@ public class TestPutTCP {
assertServerConnections(VALID_FILES.length);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessConnectionFailure() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -193,7 +196,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessEmptyFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@@ -203,7 +206,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessLargeValidFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@@ -213,7 +216,7 @@ public class TestPutTCP {
assertServerConnections(testData.length);
}
- @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ @Test
public void testRunSuccessFiveHundredMessages() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
@@ -295,10 +298,10 @@ public class TestPutTCP {
assertNull("Unexpected Message Found", received.poll());
}
- private void assertServerConnections(final int connections) {
- // Shutdown server to get completed number of connections
- shutdownServer();
- assertEquals("Server Connections not matched", server.getTotalNumConnections(), connections);
+ private void assertServerConnections(final int connections) throws InterruptedException {
+ while (server.getTotalConnections() != connections) {
+ Thread.sleep(10);
+ }
}
private String[] createContent(final int size) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
index 05e01ee..2549ffc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
@@ -16,7 +16,8 @@
*/
package org.apache.nifi.processors.standard.util;
-import java.io.IOException;
+import org.apache.nifi.io.socket.SocketUtils;
+
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
@@ -32,7 +33,7 @@ public class TCPTestServer implements Runnable {
private final InetAddress ipAddress;
private final String messageDelimiter;
private final ArrayBlockingQueue<List<Byte>> queue;
- private final AtomicInteger totalNumConnections = new AtomicInteger();
+ private final AtomicInteger totalConnections = new AtomicInteger();
private final boolean closeOnMessageReceived;
private volatile ServerSocket serverSocket;
@@ -65,30 +66,22 @@ public class TCPTestServer implements Runnable {
shutdownServer();
}
- public synchronized void shutdownServer() {
+ public int getPort(){
+ return port;
+ }
+
+ private synchronized void shutdownServer() {
if (isServerRunning()) {
- try {
- serverSocket.close();
- } catch (IOException ioe) {
- // Do Nothing.
- }
+ SocketUtils.closeQuietly(serverSocket);
}
}
- public synchronized void shutdownConnection() {
+ private synchronized void shutdownConnection() {
if (isConnected()) {
- try {
- connectionSocket.close();
- } catch (IOException ioe) {
- // Do Nothing.
- }
+ SocketUtils.closeQuietly(connectionSocket);
}
}
- public int getPort(){
- return port;
- }
-
private void storeReceivedMessage(final List<Byte> message) {
queue.add(message);
if (closeOnMessageReceived) {
@@ -104,8 +97,8 @@ public class TCPTestServer implements Runnable {
return connectionSocket != null && !connectionSocket.isClosed();
}
- public int getTotalNumConnections() {
- return totalNumConnections.get();
+ public int getTotalConnections() {
+ return totalConnections.get();
}
protected boolean isDelimiterPresent(final List<Byte> message) {
@@ -140,7 +133,7 @@ public class TCPTestServer implements Runnable {
try {
while (isServerRunning()) {
connectionSocket = serverSocket.accept();
- totalNumConnections.incrementAndGet();
+ totalConnections.incrementAndGet();
final InputStream inputStream = connectionSocket.getInputStream();
while (isConnected()) {
final List<Byte> message = new ArrayList<>();
@@ -166,8 +159,7 @@ public class TCPTestServer implements Runnable {
} catch (Exception e) {
// Do Nothing
} finally {
- shutdownConnection();
- shutdownServer();
+ shutdown();
}
}