You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2021/10/28 17:12:36 UTC

[nifi] branch main updated: NIFI-9346 Added closing of EventSender to TestListenRELP

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 16e6045  NIFI-9346 Added closing of EventSender to TestListenRELP
16e6045 is described below

commit 16e6045d13452cbff7da9685f0da4e2278a06434
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Oct 28 11:54:14 2021 -0500

    NIFI-9346 Added closing of EventSender to TestListenRELP
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #5492.
---
 .../nifi/processors/standard/TestListenRELP.java   | 27 +++++++++++-----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index efdac3a..a15ef07 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -20,6 +20,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.ftpserver.ssl.ClientAuth;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
 import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
 import org.apache.nifi.processor.ProcessContext;
@@ -31,8 +33,6 @@ import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
@@ -109,7 +109,7 @@ public class TestListenRELP {
     }
 
     @Test
-    public void testRELPFramesAreReceivedSuccessfully() throws IOException {
+    public void testRELPFramesAreReceivedSuccessfully() throws Exception {
         final int relpFrames = 5;
         final List<RELPFrame> frames = getFrames(relpFrames);
 
@@ -135,7 +135,7 @@ public class TestListenRELP {
     }
 
     @Test
-    public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws IOException {
+    public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws Exception {
 
         runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
 
@@ -164,9 +164,7 @@ public class TestListenRELP {
     }
 
     @Test
-    public void testRunMutualTls() throws IOException, TlsException, InitializationException {
-
-
+    public void testRunMutualTls() throws Exception {
         final String serviceIdentifier = SSLContextService.class.getName();
         when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
@@ -206,9 +204,7 @@ public class TestListenRELP {
         runner.shutdown();
     }
 
-    private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext)
-            throws IOException {
-
+    private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext) throws Exception {
         final int port = NetworkUtils.availablePort();
         runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
         // Run Processor and start Dispatcher without shutting down
@@ -243,18 +239,21 @@ public class TestListenRELP {
         return frames;
     }
 
-    private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) {
+    private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) throws Exception {
         final ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory(runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
+        eventSenderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        eventSenderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
         if (sslContext != null) {
             eventSenderFactory.setSslContext(sslContext);
         }
 
         eventSenderFactory.setTimeout(SENDER_TIMEOUT);
-        EventSender<byte[]> eventSender = eventSenderFactory.getEventSender();
-        eventSender.sendEvent(relpMessages);
+        try (final EventSender<byte[]> eventSender = eventSenderFactory.getEventSender()) {
+            eventSender.sendEvent(relpMessages);
+        }
     }
 
-    private class MockListenRELP extends ListenRELP {
+    private static class MockListenRELP extends ListenRELP {
         private final List<RELPMessage> mockEvents;
 
         public MockListenRELP() {