You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/12/15 22:59:29 UTC
[nifi] branch main updated: NIFI-9478 Moved Netty Log Exception Handler to end of pipeline
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5e9c868 NIFI-9478 Moved Netty Log Exception Handler to end of pipeline
5e9c868 is described below
commit 5e9c86885ce4557f41a955adcaa3a6e70df3f501
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Mon Dec 13 12:31:26 2021 -0500
NIFI-9478 Moved Netty Log Exception Handler to end of pipeline
- Changed display name of Max Number of TCP Connections to Max Number of Worker Threads for ListenTCP
- Set Netty Socket Receive Buffer using Max Socket Buffer Size in ListenTCP
This closes #5599
Signed-off-by: David Handermann <ex...@apache.org>
---
.../netty/ByteArrayMessageNettyEventServerFactory.java | 8 ++++----
.../transport/netty/ByteArrayNettyEventSenderFactory.java | 2 +-
.../transport/netty/StreamingNettyEventSenderFactory.java | 4 ++--
.../event/transport/netty/StringNettyEventSenderFactory.java | 2 +-
.../apache/nifi/processor/util/listen/ListenerProperties.java | 5 +++--
.../java/org/apache/nifi/processors/standard/ListenRELP.java | 6 +++---
.../java/org/apache/nifi/processors/standard/ListenTCP.java | 10 +++++-----
.../standard/relp/handler/RELPMessageServerFactory.java | 4 ++--
8 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
index 036161e..3073926 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
@@ -61,17 +61,17 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac
if (TransportProtocol.UDP.equals(protocol)) {
setHandlerSupplier(() -> Arrays.asList(
- logExceptionChannelHandler,
new DatagramByteArrayMessageDecoder(),
- byteArrayMessageChannelHandler
+ byteArrayMessageChannelHandler,
+ logExceptionChannelHandler
));
} else {
setHandlerSupplier(() -> Arrays.asList(
- logExceptionChannelHandler,
new DelimiterBasedFrameDecoder(maxFrameLength, STRIP_DELIMITER, Unpooled.wrappedBuffer(delimiter)),
new ByteArrayDecoder(),
new SocketByteArrayMessageDecoder(),
- byteArrayMessageChannelHandler
+ byteArrayMessageChannelHandler,
+ logExceptionChannelHandler
));
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
index 71219b3..2e005e5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
@@ -39,8 +39,8 @@ public class ByteArrayNettyEventSenderFactory extends NettyEventSenderFactory<by
public ByteArrayNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol) {
super(address, port, protocol);
final List<ChannelHandler> handlers = new ArrayList<>();
- handlers.add(new LogExceptionChannelHandler(log));
handlers.add(new ByteArrayEncoder());
+ handlers.add(new LogExceptionChannelHandler(log));
setHandlerSupplier(() -> handlers);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
index efb130b..483a765 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
@@ -43,9 +43,9 @@ public class StreamingNettyEventSenderFactory extends NettyEventSenderFactory<In
final InputStreamMessageEncoder inputStreamMessageEncoder = new InputStreamMessageEncoder();
setHandlerSupplier(() -> Arrays.asList(
- logExceptionChannelHandler,
new ChunkedWriteHandler(),
- inputStreamMessageEncoder
+ inputStreamMessageEncoder,
+ logExceptionChannelHandler
));
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
index 448ee3c..47999c6 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactory.java
@@ -46,12 +46,12 @@ public class StringNettyEventSenderFactory extends NettyEventSenderFactory<Strin
public StringNettyEventSenderFactory(final ComponentLog log, final String address, final int port, final TransportProtocol protocol, final Charset charset, final LineEnding lineEnding) {
super(address, port, protocol);
final List<ChannelHandler> handlers = new ArrayList<>();
- handlers.add(new LogExceptionChannelHandler(log));
handlers.add(new StringEncoder(charset));
if (LineEnding.UNIX.equals(lineEnding)) {
handlers.add(new LineEncoder(LineSeparator.UNIX, charset));
}
+ handlers.add(new LogExceptionChannelHandler(log));
setHandlerSupplier(() -> handlers);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
index 5810f30..9bcdc66 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
@@ -126,9 +126,10 @@ public class ListenerProperties {
.defaultValue("10000")
.required(true)
.build();
- public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections")
- .description("The maximum number of concurrent TCP connections to accept.")
+ .displayName("Max Number of Worker Threads")
+ .description("The maximum number of worker threads available for servicing TCP connections.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 0e3cef9..7eff677 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -116,7 +116,7 @@ public class ListenRELP extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
- int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+ int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
@@ -130,7 +130,7 @@ public class ListenRELP extends AbstractProcessor {
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events);
eventFactory.setSocketReceiveBuffer(bufferSize);
- eventFactory.setWorkerThreads(maxConnections);
+ eventFactory.setWorkerThreads(workerThreads);
configureFactoryForSsl(context, eventFactory);
try {
@@ -157,7 +157,7 @@ public class ListenRELP extends AbstractProcessor {
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
- descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
descriptors.add(SSL_CONTEXT_SERVICE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index e6f29c9..b0ab7f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -139,10 +139,9 @@ public class ListenTCP extends AbstractProcessor {
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
- // Deprecated
descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
descriptors.add(ListenerProperties.CHARSET);
- descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
// Deprecated
@@ -160,8 +159,9 @@ public class ListenTCP extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
- int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+ int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -181,8 +181,8 @@ public class ListenTCP extends AbstractProcessor {
eventFactory.setClientAuth(clientAuth);
}
- eventFactory.setSocketReceiveBuffer(bufferSize);
- eventFactory.setWorkerThreads(maxConnections);
+ eventFactory.setSocketReceiveBuffer(socketBufferSize);
+ eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
try {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
index 646099d..83e60b9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
@@ -52,9 +52,9 @@ public class RELPMessageServerFactory extends NettyEventServerFactory {
final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset);
setHandlerSupplier(() -> Arrays.asList(
- logExceptionChannelHandler,
new RELPFrameDecoder(log, charset),
- relpChannelHandler
+ relpChannelHandler,
+ logExceptionChannelHandler
));
}
}