You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/12/15 22:59:29 UTC

[nifi] branch main updated: NIFI-9478 Moved Netty Log Exception Handler to end of pipeline

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e9c868  NIFI-9478 Moved Netty Log Exception Handler to end of pipeline
5e9c868 is described below

commit 5e9c86885ce4557f41a955adcaa3a6e70df3f501
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Mon Dec 13 12:31:26 2021 -0500

    NIFI-9478 Moved Netty Log Exception Handler to end of pipeline
    
    - Changed display name of Max Number of TCP Connections to Max Number of Worker Threads for ListenTCP
    - Set Netty Socket Receive Buffer using Max Socket Buffer Size in ListenTCP
    
    This closes #5599
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../netty/ByteArrayMessageNettyEventServerFactory.java         |  8 ++++----
 .../transport/netty/ByteArrayNettyEventSenderFactory.java      |  2 +-
 .../transport/netty/StreamingNettyEventSenderFactory.java      |  4 ++--
 .../event/transport/netty/StringNettyEventSenderFactory.java   |  2 +-
 .../apache/nifi/processor/util/listen/ListenerProperties.java  |  5 +++--
 .../java/org/apache/nifi/processors/standard/ListenRELP.java   |  6 +++---
 .../java/org/apache/nifi/processors/standard/ListenTCP.java    | 10 +++++-----
 .../standard/relp/handler/RELPMessageServerFactory.java        |  4 ++--
 8 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
index 036161e..3073926 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
@@ -61,17 +61,17 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac
 
         if (TransportProtocol.UDP.equals(protocol)) {
             setHandlerSupplier(() -> Arrays.asList(
-                    logExceptionChannelHandler,
                     new DatagramByteArrayMessageDecoder(),
-                    byteArrayMessageChannelHandler
+                    byteArrayMessageChannelHandler,
+                    logExceptionChannelHandler
             ));
         } else {
             setHandlerSupplier(() -> Arrays.asList(
-                    logExceptionChannelHandler,
                     new DelimiterBasedFrameDecoder(maxFrameLength, STRIP_DELIMITER, Unpooled.wrappedBuffer(delimiter)),
                     new ByteArrayDecoder(),
                     new SocketByteArrayMessageDecoder(),
-                    byteArrayMessageChannelHandler
+                    byteArrayMessageChannelHandler,
+                    logExceptionChannelHandler
             ));
         }
     }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
index 71219b3..2e005e5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
@@ -39,8 +39,8 @@ public class ByteArrayNettyEventSenderFactory extends NettyEventSenderFactory<by
     public ByteArrayNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol) {
         super(address, port, protocol);
         final List<ChannelHandler> handlers = new ArrayList<>();
-        handlers.add(new LogExceptionChannelHandler(log));
         handlers.add(new ByteArrayEncoder());
+        handlers.add(new LogExceptionChannelHandler(log));
         setHandlerSupplier(() -> handlers);
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
index efb130b..483a765 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
@@ -43,9 +43,9 @@ public class StreamingNettyEventSenderFactory extends NettyEventSenderFactory<In
         final InputStreamMessageEncoder inputStreamMessageEncoder = new InputStreamMessageEncoder();
 
         setHandlerSupplier(() -> Arrays.asList(
-                logExceptionChannelHandler,
                 new ChunkedWriteHandler(),
-                inputStreamMessageEncoder
+                inputStreamMessageEncoder,
+                logExceptionChannelHandler
         ));
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
index 448ee3c..47999c6 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
@@ -46,12 +46,12 @@ public class StringNettyEventSenderFactory extends NettyEventSenderFactory<Strin
     public StringNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol, final Charset charset, final LineEnding lineEnding) {
         super(address, port, protocol);
         final List<ChannelHandler> handlers = new ArrayList<>();
-        handlers.add(new LogExceptionChannelHandler(log));
         handlers.add(new StringEncoder(charset));
 
         if (LineEnding.UNIX.equals(lineEnding)) {
             handlers.add(new LineEncoder(LineSeparator.UNIX, charset));
         }
+        handlers.add(new LogExceptionChannelHandler(log));
         setHandlerSupplier(() -> handlers);
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
index 5810f30..9bcdc66 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
@@ -126,9 +126,10 @@ public class ListenerProperties {
             .defaultValue("10000")
             .required(true)
             .build();
-    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder()
             .name("Max Number of TCP Connections")
-            .description("The maximum number of concurrent TCP connections to accept.")
+            .displayName("Max Number of Worker Threads")
+            .description("The maximum number of worker threads available for servicing TCP connections.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .defaultValue("2")
             .required(true)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 0e3cef9..7eff677 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -116,7 +116,7 @@ public class ListenRELP extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
         int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
         InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
@@ -130,7 +130,7 @@ public class ListenRELP extends AbstractProcessor {
         messageDemarcatorBytes = msgDemarcator.getBytes(charset);
         final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events);
         eventFactory.setSocketReceiveBuffer(bufferSize);
-        eventFactory.setWorkerThreads(maxConnections);
+        eventFactory.setWorkerThreads(workerThreads);
         configureFactoryForSsl(context, eventFactory);
 
         try {
@@ -157,7 +157,7 @@ public class ListenRELP extends AbstractProcessor {
         descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(ListenerProperties.CHARSET);
-        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.WORKER_THREADS);
         descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
         descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
         descriptors.add(SSL_CONTEXT_SERVICE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index e6f29c9..b0ab7f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -139,10 +139,9 @@ public class ListenTCP extends AbstractProcessor {
         descriptors.add(ListenerProperties.PORT);
         descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
         descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
-        // Deprecated
         descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(ListenerProperties.CHARSET);
-        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.WORKER_THREADS);
         descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
         descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
         // Deprecated
@@ -160,8 +159,9 @@ public class ListenTCP extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
         int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
         InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
         Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -181,8 +181,8 @@ public class ListenTCP extends AbstractProcessor {
             eventFactory.setClientAuth(clientAuth);
         }
 
-        eventFactory.setSocketReceiveBuffer(bufferSize);
-        eventFactory.setWorkerThreads(maxConnections);
+        eventFactory.setSocketReceiveBuffer(socketBufferSize);
+        eventFactory.setWorkerThreads(workerThreads);
         eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
 
         try {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
index 646099d..83e60b9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
@@ -52,9 +52,9 @@ public class RELPMessageServerFactory extends NettyEventServerFactory {
         final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset);
 
         setHandlerSupplier(() -> Arrays.asList(
-                logExceptionChannelHandler,
                 new RELPFrameDecoder(log, charset),
-                relpChannelHandler
+                relpChannelHandler,
+                logExceptionChannelHandler
         ));
     }
 }