You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/11/10 20:44:05 UTC
[nifi] branch main updated: NIFI-10780 Improved Event Server to avoid dropping messages
This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 588e04cd07 NIFI-10780 Improved Event Server to avoid dropping messages
588e04cd07 is described below
commit 588e04cd0771d9895554e106ef401e24f096bc96
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 8 11:53:00 2022 -0600
NIFI-10780 Improved Event Server to avoid dropping messages
- Improved ByteArrayMessageChannelHandler to call BlockingQueue.offer() while the server is not shutting down
- Improved NettyEventServer to use ChannelFuture.await() for closing the channel to allow shutdown to proceed
- Added test method for EventDroppedException handling
Signed-off-by: Nathan Gough <th...@gmail.com>
This closes #6634.
---
...ntException.java => EventDroppedException.java} | 15 ++++++--
.../nifi/event/transport/EventException.java | 9 +++++
.../event/transport/netty/NettyEventServer.java | 9 ++++-
.../channel/ByteArrayMessageChannelHandler.java | 24 +++++++++++-
.../netty/StringNettyEventSenderFactoryTest.java | 45 ++++++++++++++++++++++
5 files changed, 97 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
similarity index 72%
copy from nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
copy to nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
index 4c40719b28..98c97d1907 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventDroppedException.java
@@ -17,16 +17,25 @@
package org.apache.nifi.event.transport;
/**
- * Event Exception indicating issues when transporting events
+ * Event Dropped Exception indicating when a handler drops one or more events
*/
-public class EventException extends RuntimeException {
+public class EventDroppedException extends EventException {
+ /**
+ * Event Exception
+ *
+ * @param message Message
+ */
+ public EventDroppedException(final String message) {
+ super(message);
+ }
+
/**
* Event Exception
*
* @param message Message
* @param cause Throwable cause
*/
- public EventException(final String message, final Throwable cause) {
+ public EventDroppedException(final String message, final Throwable cause) {
super(message, cause);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
index 4c40719b28..55337aec10 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/EventException.java
@@ -20,6 +20,15 @@ package org.apache.nifi.event.transport;
* Event Exception indicating issues when transporting events
*/
public class EventException extends RuntimeException {
+ /**
+ * Event Exception
+ *
+ * @param message Message
+ */
+ public EventException(final String message) {
+ super(message);
+ }
+
/**
* Event Exception
*
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index 391fc2daf5..fb8aae59de 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -17,7 +17,9 @@
package org.apache.nifi.event.transport.netty;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
+import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
@@ -69,8 +71,13 @@ class NettyEventServer implements EventServer {
public void shutdown() {
try {
if (channel.isOpen()) {
- channel.close().syncUninterruptibly();
+ final ChannelFuture closeFuture = channel.close();
+ closeFuture.await(shutdownQuietPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ final String message = String.format("Close channel interrupted: Remote Address [%s]", channel.remoteAddress());
+ throw new EventException(message, e);
} finally {
group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
index 00043b94bc..da40b3154c 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/ByteArrayMessageChannelHandler.java
@@ -19,11 +19,14 @@ package org.apache.nifi.event.transport.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.EventExecutor;
+import org.apache.nifi.event.transport.EventDroppedException;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* Channel Handler for queuing bytes received as Byte Array Messages
@@ -32,6 +35,8 @@ import java.util.concurrent.BlockingQueue;
public class ByteArrayMessageChannelHandler extends SimpleChannelInboundHandler<ByteArrayMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayMessageChannelHandler.class);
+ private static final long OFFER_TIMEOUT = 500;
+
private final BlockingQueue<ByteArrayMessage> messages;
public ByteArrayMessageChannelHandler(final BlockingQueue<ByteArrayMessage> messages) {
@@ -47,6 +52,23 @@ public class ByteArrayMessageChannelHandler extends SimpleChannelInboundHandler<
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ByteArrayMessage message) {
LOGGER.debug("Message Received Length [{}] Remote Address [{}] ", message.getMessage().length, message.getSender());
- messages.add(message);
+
+ final EventExecutor executor = channelHandlerContext.executor();
+ while (!offer(message)) {
+ if (executor.isShuttingDown()) {
+ throw new EventDroppedException(String.format("Dropped Message from Remote Address [%s] executor shutting down", message.getSender()));
+ }
+ }
+
+ LOGGER.debug("Message Queued Length [{}] Remote Address [{}] ", message.getMessage().length, message.getSender());
+ }
+
+ private boolean offer(final ByteArrayMessage message) {
+ try {
+ return messages.offer(message, OFFER_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new EventDroppedException(String.format("Dropped Message from Remote Address [%s] queue offer interrupted", message.getSender()), e);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
index 6afa2298b7..bf8310d672 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.event.transport.netty;
+import org.apache.nifi.event.transport.EventDroppedException;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.EventServer;
@@ -32,11 +33,14 @@ import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.net.ssl.SSLContext;
import java.net.InetAddress;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -47,8 +51,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class StringNettyEventSenderFactoryTest {
@@ -68,6 +76,8 @@ public class StringNettyEventSenderFactoryTest {
private static final int SINGLE_THREAD = 1;
+ private static final int SINGLE_MESSAGE = 1;
+
static {
try {
ADDRESS = InetAddress.getByName("127.0.0.1");
@@ -76,6 +86,9 @@ public class StringNettyEventSenderFactoryTest {
}
}
+ @Captor
+ private ArgumentCaptor<Exception> exceptionCaptor;
+
@Mock
private ComponentLog log;
@@ -112,6 +125,34 @@ public class StringNettyEventSenderFactoryTest {
assertMessageReceived(messages);
}
+ @Test
+ public void testSendEventTcpEventDropped() throws Exception {
+ final int port = NetworkUtils.getAvailableTcpPort();
+
+ final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>(SINGLE_MESSAGE);
+ final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
+ final EventServer eventServer = serverFactory.getEventServer();
+ final NettyEventSenderFactory<String> senderFactory = getEventSenderFactory(port);
+ try (final EventSender<String> sender = senderFactory.getEventSender()) {
+ sender.sendEvent(MESSAGE);
+ assertMessageMatched(messages.take());
+
+ // Send event to be queued
+ sender.sendEvent(MESSAGE);
+
+ // Send event to be dropped
+ sender.sendEvent(MESSAGE);
+ } finally {
+ eventServer.shutdown();
+ }
+
+ assertEquals(SINGLE_MESSAGE, messages.size());
+
+ verify(log).warn(anyString(), any(SocketAddress.class), exceptionCaptor.capture());
+ final Exception exception = exceptionCaptor.getValue();
+ assertInstanceOf(EventDroppedException.class, exception);
+ }
+
@Test
public void testSendEventTcpSslContextConfigured() throws Exception {
final int port = NetworkUtils.getAvailableTcpPort();
@@ -138,6 +179,10 @@ public class StringNettyEventSenderFactoryTest {
private void assertMessageReceived(final BlockingQueue<ByteArrayMessage> messages) throws InterruptedException {
final ByteArrayMessage messageReceived = messages.poll(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertNotNull(messageReceived, "Message not received");
+ assertMessageMatched(messageReceived);
+ }
+
+ private void assertMessageMatched(final ByteArrayMessage messageReceived) {
final String eventReceived = new String(messageReceived.getMessage(), CHARSET);
assertEquals(MESSAGE, eventReceived, "Message not matched");
assertEquals(ADDRESS.getHostAddress(), messageReceived.getSender(), "Sender not matched");