You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/11/10 20:44:05 UTC

[nifi] branch main updated: NIFI-10780 Improved Event Server to avoid dropping messages

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 588e04cd07 NIFI-10780 Improved Event Server to avoid dropping messages
588e04cd07 is described below

commit 588e04cd0771d9895554e106ef401e24f096bc96
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 8 11:53:00 2022 -0600

    NIFI-10780 Improved Event Server to avoid dropping messages
    
    - Improved ByteArrayMessageChannelHandler to call BlockingQueue.offer() while the server is not shutting down
    - Improved NettyEventServer to use ChannelFuture.await() for closing the channel to allow shutdown to proceed
    - Added test method for EventDroppedException handling
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #6634.
---
 ...ntException.java => EventDroppedException.java} | 15 ++++++--
 .../nifi/event/transport/EventException.java       |  9 +++++
 .../event/transport/netty/NettyEventServer.java    |  9 ++++-
 .../channel/ByteArrayMessageChannelHandler.java    | 24 +++++++++++-
 .../netty/StringNettyEventSenderFactoryTest.java   | 45 ++++++++++++++++++++++
 5 files changed, 97 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
similarity index 72%
copy from nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
copy to nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
index 4c40719b28..98c97d1907 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
@@ -17,16 +17,25 @@
 package org.apache.nifi.event.transport;
 
 /**
- * Event Exception indicating issues when transporting events
+ * Event Dropped Exception indicating when a handler drops one or more events
  */
-public class EventException extends RuntimeException {
+public class EventDroppedException extends EventException {
+    /**
+     * Event Exception
+     *
+     * @param message Message
+     */
+    public EventDroppedException(final String message) {
+        super(message);
+    }
+
     /**
      * Event Exception
      *
      * @param message Message
      * @param cause Throwable cause
      */
-    public EventException(final String message, final Throwable cause) {
+    public EventDroppedException(final String message, final Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
index 4c40719b28..55337aec10 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
@@ -20,6 +20,15 @@ package org.apache.nifi.event.transport;
  * Event Exception indicating issues when transporting events
  */
 public class EventException extends RuntimeException {
+    /**
+     * Event Exception
+     *
+     * @param message Message
+     */
+    public EventException(final String message) {
+        super(message);
+    }
+
     /**
      * Event Exception
      *
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index 391fc2daf5..fb8aae59de 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -17,7 +17,9 @@
 package org.apache.nifi.event.transport.netty;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoopGroup;
+import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventServer;
 import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
 import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
@@ -69,8 +71,13 @@ class NettyEventServer implements EventServer {
     public void shutdown() {
         try {
             if (channel.isOpen()) {
-                channel.close().syncUninterruptibly();
+                final ChannelFuture closeFuture = channel.close();
+                closeFuture.await(shutdownQuietPeriod.toMillis(), TimeUnit.MILLISECONDS);
             }
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            final String message = String.format("Close channel interrupted: Remote Address [%s]", channel.remoteAddress());
+            throw new EventException(message, e);
         } finally {
             group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
         }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
index 00043b94bc..da40b3154c 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
@@ -19,11 +19,14 @@ package org.apache.nifi.event.transport.netty.channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.EventExecutor;
+import org.apache.nifi.event.transport.EventDroppedException;
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Channel Handler for queuing bytes received as Byte Array Messages
@@ -32,6 +35,8 @@ import java.util.concurrent.BlockingQueue;
 public class ByteArrayMessageChannelHandler extends SimpleChannelInboundHandler<ByteArrayMessage> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayMessageChannelHandler.class);
 
+    private static final long OFFER_TIMEOUT = 500;
+
     private final BlockingQueue<ByteArrayMessage> messages;
 
     public ByteArrayMessageChannelHandler(final BlockingQueue<ByteArrayMessage> messages) {
@@ -47,6 +52,23 @@ public class ByteArrayMessageChannelHandler extends SimpleChannelInboundHandler<
     @Override
     protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ByteArrayMessage message) {
         LOGGER.debug("Message Received Length [{}] Remote Address [{}] ", message.getMessage().length, message.getSender());
-        messages.add(message);
+
+        final EventExecutor executor = channelHandlerContext.executor();
+        while (!offer(message)) {
+            if (executor.isShuttingDown()) {
+                throw new EventDroppedException(String.format("Dropped Message from Remote Address [%s] executor shutting down", message.getSender()));
+            }
+        }
+
+        LOGGER.debug("Message Queued Length [{}] Remote Address [{}] ", message.getMessage().length, message.getSender());
+    }
+
+    private boolean offer(final ByteArrayMessage message) {
+        try {
+            return messages.offer(message, OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new EventDroppedException(String.format("Dropped Message from Remote Address [%s] queue offer interrupted", message.getSender()), e);
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
index 6afa2298b7..bf8310d672 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.event.transport.netty;
 
+import org.apache.nifi.event.transport.EventDroppedException;
 import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventSender;
 import org.apache.nifi.event.transport.EventServer;
@@ -32,11 +33,14 @@ import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
 import org.apache.nifi.security.util.TlsConfiguration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import javax.net.ssl.SSLContext;
 import java.net.InetAddress;
+import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -47,8 +51,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
 public class StringNettyEventSenderFactoryTest {
@@ -68,6 +76,8 @@ public class StringNettyEventSenderFactoryTest {
 
     private static final int SINGLE_THREAD = 1;
 
+    private static final int SINGLE_MESSAGE = 1;
+
     static {
         try {
             ADDRESS = InetAddress.getByName("127.0.0.1");
@@ -76,6 +86,9 @@ public class StringNettyEventSenderFactoryTest {
         }
     }
 
+    @Captor
+    private ArgumentCaptor<Exception> exceptionCaptor;
+
     @Mock
     private ComponentLog log;
 
@@ -112,6 +125,34 @@ public class StringNettyEventSenderFactoryTest {
         assertMessageReceived(messages);
     }
 
+    @Test
+    public void testSendEventTcpEventDropped() throws Exception {
+        final int port = NetworkUtils.getAvailableTcpPort();
+
+        final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>(SINGLE_MESSAGE);
+        final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
+        final EventServer eventServer = serverFactory.getEventServer();
+        final NettyEventSenderFactory<String> senderFactory = getEventSenderFactory(port);
+        try (final EventSender<String> sender = senderFactory.getEventSender()) {
+            sender.sendEvent(MESSAGE);
+            assertMessageMatched(messages.take());
+
+            // Send event to be queued
+            sender.sendEvent(MESSAGE);
+
+            // Send event to be dropped
+            sender.sendEvent(MESSAGE);
+        } finally {
+            eventServer.shutdown();
+        }
+
+        assertEquals(SINGLE_MESSAGE, messages.size());
+
+        verify(log).warn(anyString(), any(SocketAddress.class), exceptionCaptor.capture());
+        final Exception exception = exceptionCaptor.getValue();
+        assertInstanceOf(EventDroppedException.class, exception);
+    }
+
     @Test
     public void testSendEventTcpSslContextConfigured() throws Exception {
         final int port = NetworkUtils.getAvailableTcpPort();
@@ -138,6 +179,10 @@ public class StringNettyEventSenderFactoryTest {
     private void assertMessageReceived(final BlockingQueue<ByteArrayMessage> messages) throws InterruptedException {
         final ByteArrayMessage messageReceived = messages.poll(TIMEOUT_SECONDS, TimeUnit.SECONDS);
         assertNotNull(messageReceived, "Message not received");
+        assertMessageMatched(messageReceived);
+    }
+
+    private void assertMessageMatched(final ByteArrayMessage messageReceived) {
         final String eventReceived = new String(messageReceived.getMessage(), CHARSET);
         assertEquals(MESSAGE, eventReceived, "Message not matched");
         assertEquals(ADDRESS.getHostAddress(), messageReceived.getSender(), "Sender not matched");