You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/10/07 19:30:59 UTC

[nifi] branch main updated: NIFI-10604 Added Idle Connection Timeout property to ListenTCP

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ff1dadcf6 NIFI-10604 Added Idle Connection Timeout property to ListenTCP
9ff1dadcf6 is described below

commit 9ff1dadcf658e363b4e14a550b1651572b3faa97
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Thu Oct 6 19:26:03 2022 -0400

    NIFI-10604 Added Idle Connection Timeout property to ListenTCP
    
    This closes #6492
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../transport/netty/NettyEventServerFactory.java   | 32 ++++++++++++++++------
 .../netty/channel/StandardChannelInitializer.java  |  2 +-
 .../apache/nifi/processors/standard/ListenTCP.java | 15 +++++++++-
 3 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
index bddbccf6b4..0f333b3106 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
@@ -20,8 +20,10 @@ import io.netty.bootstrap.AbstractBootstrap;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
@@ -70,6 +72,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
 
     private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
 
+    private Duration idleTimeout = null;
+
     private BufferAllocator bufferAllocator = BufferAllocator.POOLED;
 
     public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) {
@@ -150,6 +154,14 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
         this.bufferAllocator = Objects.requireNonNull(bufferAllocator, "Buffer Allocator required");
     }
 
+    /**
+     * Set an idle timeout for connections, which closes a connection when there have been no read or writes for the given timeout period.
+     * Has no default - idle connections will not be closed by the server unless this timeout is set to non-zero.
+     */
+    public void setIdleTimeout(final Duration timeout) {
+        this.idleTimeout = Objects.requireNonNull(timeout, "Timeout value required");
+    }
+
     /**
      * Get Event Server with Channel bound to configured address and port number
      *
@@ -181,22 +193,26 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
         if (TransportProtocol.UDP.equals(protocol)) {
             final Bootstrap bootstrap = new Bootstrap();
             bootstrap.channel(NioDatagramChannel.class);
-            bootstrap.handler(new StandardChannelInitializer<>(handlerSupplier));
-
+            bootstrap.handler(getChannelInitializer());
             return bootstrap;
         } else {
             final ServerBootstrap bootstrap = new ServerBootstrap();
             bootstrap.channel(NioServerSocketChannel.class);
-            if (sslContext == null) {
-                bootstrap.childHandler(new StandardChannelInitializer<>(handlerSupplier));
-            } else {
-                bootstrap.childHandler(new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth));
-            }
-
+            bootstrap.childHandler(getChannelInitializer());
             return bootstrap;
         }
     }
 
+    private ChannelInitializer getChannelInitializer() {
+        final StandardChannelInitializer<Channel> channelInitializer = sslContext == null
+                ? new StandardChannelInitializer<>(handlerSupplier)
+                : new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth);
+        if (idleTimeout != null) {
+            channelInitializer.setIdleTimeout(idleTimeout);
+        }
+        return channelInitializer;
+    }
+
     private EventServer getBoundEventServer(final AbstractBootstrap<?, ?> bootstrap, final EventLoopGroup group) {
         final ChannelFuture bindFuture = bootstrap.bind(address, port);
         try {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index c3fcf81494..8622fbfad5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -60,7 +60,7 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
     }
 
     /**
-     * Set the idle timeout period for outgoing client connections
+     * Set the idle timeout period for channel connections, monitoring both read and write times
      */
     public void setIdleTimeout(final Duration idleTimeout) {
         this.idleTimeout = Objects.requireNonNull(idleTimeout);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index 4a4ccf0ec0..86e3b0c358 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -58,6 +58,7 @@ import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.charset.Charset;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -69,6 +70,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 @SupportsBatching
@@ -133,6 +135,15 @@ public class ListenTCP extends AbstractProcessor {
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    protected static final PropertyDescriptor IDLE_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("idle-timeout")
+            .displayName("Idle Connection Timeout")
+            .description("The amount of time a client's connection will remain open if no data is received. The default of 0 seconds will leave connections open until they are closed by the client.")
+            .required(true)
+            .defaultValue("0 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("Messages received successfully will be sent out this relationship.")
@@ -163,6 +174,7 @@ public class ListenTCP extends AbstractProcessor {
         descriptors.add(ListenerProperties.WORKER_THREADS);
         descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
         descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(IDLE_CONNECTION_TIMEOUT);
         // Deprecated
         descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
         descriptors.add(POOL_RECV_BUFFERS);
@@ -180,6 +192,7 @@ public class ListenTCP extends AbstractProcessor {
         int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
         int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        Duration idleTimeout = Duration.ofSeconds(context.getProperty(IDLE_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS));
         final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
         final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
         final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -203,7 +216,7 @@ public class ListenTCP extends AbstractProcessor {
         final boolean poolReceiveBuffers = context.getProperty(POOL_RECV_BUFFERS).asBoolean();
         final BufferAllocator bufferAllocator = poolReceiveBuffers ? BufferAllocator.POOLED : BufferAllocator.UNPOOLED;
         eventFactory.setBufferAllocator(bufferAllocator);
-
+        eventFactory.setIdleTimeout(idleTimeout);
         eventFactory.setSocketReceiveBuffer(socketBufferSize);
         eventFactory.setWorkerThreads(workerThreads);
         eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));