You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/14 16:16:36 UTC

[nifi] branch main updated: NIFI-9645 - Updated PutSplunk to allow idle connection timeouts

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 713e2fd  NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
713e2fd is described below

commit 713e2fd03cc1f7084dc9f5ab14636a6986390257
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Fri Mar 4 21:46:42 2022 -0500

    NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
    
    This closes #5841
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../util/put/AbstractPutEventProcessor.java        |  7 ++--
 .../netty/CloseContextIdleStateHandler.java        | 38 ++++++++++++++++++++++
 .../transport/netty/NettyEventSenderFactory.java   | 10 ++++++
 .../netty/channel/StandardChannelInitializer.java  | 13 ++++++++
 4 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 567f8fd..d9b8577 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -78,10 +78,11 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
             .build();
     public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
             .Builder().name("Idle Connection Expiration")
-            .description("The amount of time a connection should be held open without being used before closing the connection.")
+            .description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
             .required(true)
-            .defaultValue("5 seconds")
+            .defaultValue("15 seconds")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     // Putting these properties here so sub-classes don't have to redefine them, but they are
@@ -249,7 +250,9 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
         factory.setShutdownQuietPeriod(Duration.ZERO); // Quiet period not necessary since sending threads will have completed before shutting down event sender
 
         final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int idleTimeout = context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
         factory.setTimeout(Duration.ofMillis(timeout));
+        factory.setIdleTimeout(Duration.ofSeconds(idleTimeout));
 
         final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
         if (sslContextServiceProperty.isSet()) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
new file mode 100644
index 0000000..c452a32
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+/**
+ * Idle State Handler closes channel context when state indicates idle communications
+ */
+public class CloseContextIdleStateHandler extends ChannelDuplexHandler {
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
+        if (event instanceof IdleStateEvent) {
+            final IdleStateEvent idleStateEvent = (IdleStateEvent) event;
+            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
+                context.close();
+            }
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index b2f34d4..9030297 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -61,6 +61,8 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
 
     private Duration timeout = Duration.ofSeconds(30);
 
+    private Duration idleTimeout = Duration.ofSeconds(0);
+
     private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
 
     private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
@@ -116,6 +118,13 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
     }
 
     /**
+     * Set the idle timeout period for outgoing client connections
+     */
+    public void setIdleTimeout(final Duration idleTimeout) {
+        this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout required");
+    }
+
+    /**
      * Set shutdown quiet period
      *
      * @param quietPeriod shutdown quiet period
@@ -205,6 +214,7 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
                 ? new StandardChannelInitializer<>(handlerSupplier)
                 : new ClientSslStandardChannelInitializer<>(handlerSupplier, sslContext);
         channelInitializer.setWriteTimeout(timeout);
+        channelInitializer.setIdleTimeout(idleTimeout);
         return channelInitializer;
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index bbfb148..c3fcf81 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -20,7 +20,9 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
 
 import java.time.Duration;
 import java.util.List;
@@ -37,6 +39,8 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
 
     private Duration writeTimeout = Duration.ofSeconds(30);
 
+    private Duration idleTimeout = Duration.ofSeconds(0);
+
     /**
      * Standard Channel Initializer with handlers
      *
@@ -55,10 +59,19 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
         this.writeTimeout = Objects.requireNonNull(writeTimeout);
     }
 
+    /**
+     * Set the idle timeout period for outgoing client connections
+     */
+    public void setIdleTimeout(final Duration idleTimeout) {
+        this.idleTimeout = Objects.requireNonNull(idleTimeout);
+    }
+
     @Override
     protected void initChannel(Channel channel) {
         final ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
         pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
+        pipeline.addLast(new CloseContextIdleStateHandler());
         handlerSupplier.get().forEach(pipeline::addLast);
     }
 }