You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/10/07 19:30:59 UTC
[nifi] branch main updated: NIFI-10604 Added Idle Connection Timeout property to ListenTCP
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9ff1dadcf6 NIFI-10604 Added Idle Connection Timeout property to ListenTCP
9ff1dadcf6 is described below
commit 9ff1dadcf658e363b4e14a550b1651572b3faa97
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Thu Oct 6 19:26:03 2022 -0400
NIFI-10604 Added Idle Connection Timeout property to ListenTCP
This closes #6492
Signed-off-by: David Handermann <ex...@apache.org>
---
.../transport/netty/NettyEventServerFactory.java | 32 ++++++++++++++++------
.../netty/channel/StandardChannelInitializer.java | 2 +-
.../apache/nifi/processors/standard/ListenTCP.java | 15 +++++++++-
3 files changed, 39 insertions(+), 10 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
index bddbccf6b4..0f333b3106 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
@@ -20,8 +20,10 @@ import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
@@ -70,6 +72,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
+ private Duration idleTimeout = null;
+
private BufferAllocator bufferAllocator = BufferAllocator.POOLED;
public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) {
@@ -150,6 +154,14 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
this.bufferAllocator = Objects.requireNonNull(bufferAllocator, "Buffer Allocator required");
}
+ /**
+ * Set an idle timeout for connections, which closes a connection when there have been no read or writes for the given timeout period.
+ * Has no default - idle connections will not be closed by the server unless this timeout is set to non-zero.
+ */
+ public void setIdleTimeout(final Duration timeout) {
+ this.idleTimeout = Objects.requireNonNull(timeout, "Timeout value required");
+ }
+
/**
* Get Event Server with Channel bound to configured address and port number
*
@@ -181,22 +193,26 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
if (TransportProtocol.UDP.equals(protocol)) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
- bootstrap.handler(new StandardChannelInitializer<>(handlerSupplier));
-
+ bootstrap.handler(getChannelInitializer());
return bootstrap;
} else {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
- if (sslContext == null) {
- bootstrap.childHandler(new StandardChannelInitializer<>(handlerSupplier));
- } else {
- bootstrap.childHandler(new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth));
- }
-
+ bootstrap.childHandler(getChannelInitializer());
return bootstrap;
}
}
+ private ChannelInitializer getChannelInitializer() {
+ final StandardChannelInitializer<Channel> channelInitializer = sslContext == null
+ ? new StandardChannelInitializer<>(handlerSupplier)
+ : new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth);
+ if (idleTimeout != null) {
+ channelInitializer.setIdleTimeout(idleTimeout);
+ }
+ return channelInitializer;
+ }
+
private EventServer getBoundEventServer(final AbstractBootstrap<?, ?> bootstrap, final EventLoopGroup group) {
final ChannelFuture bindFuture = bootstrap.bind(address, port);
try {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index c3fcf81494..8622fbfad5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -60,7 +60,7 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
}
/**
- * Set the idle timeout period for outgoing client connections
+ * Set the idle timeout period for channel connections, monitoring both read and write times
*/
public void setIdleTimeout(final Duration idleTimeout) {
this.idleTimeout = Objects.requireNonNull(idleTimeout);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index 4a4ccf0ec0..86e3b0c358 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -58,6 +58,7 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
@@ -69,6 +70,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@SupportsBatching
@@ -133,6 +135,15 @@ public class ListenTCP extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ protected static final PropertyDescriptor IDLE_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("idle-timeout")
+ .displayName("Idle Connection Timeout")
+ .description("The amount of time a client's connection will remain open if no data is received. The default of 0 seconds will leave connections open until they are closed by the client.")
+ .required(true)
+ .defaultValue("0 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Messages received successfully will be sent out this relationship.")
@@ -163,6 +174,7 @@ public class ListenTCP extends AbstractProcessor {
descriptors.add(ListenerProperties.WORKER_THREADS);
descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(IDLE_CONNECTION_TIMEOUT);
// Deprecated
descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
descriptors.add(POOL_RECV_BUFFERS);
@@ -180,6 +192,7 @@ public class ListenTCP extends AbstractProcessor {
int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ Duration idleTimeout = Duration.ofSeconds(context.getProperty(IDLE_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS));
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -203,7 +216,7 @@ public class ListenTCP extends AbstractProcessor {
final boolean poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean();
final BufferAllocator bufferAllocator = poolReceiveBuffers ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
eventFactory.setBufferAllocator(bufferAllocator);
-
+ eventFactory.setIdleTimeout(idleTimeout);
eventFactory.setSocketReceiveBuffer(socketBufferSize);
eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));