You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/03/14 16:16:36 UTC
[nifi] branch main updated: NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 713e2fd NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
713e2fd is described below
commit 713e2fd03cc1f7084dc9f5ab14636a6986390257
Author: Nathan Gough <th...@gmail.com>
AuthorDate: Fri Mar 4 21:46:42 2022 -0500
NIFI-9645 - Updated PutSplunk to allow idle connection timeouts
This closes #5841
Signed-off-by: David Handermann <ex...@apache.org>
---
.../util/put/AbstractPutEventProcessor.java | 7 ++--
.../netty/CloseContextIdleStateHandler.java | 38 ++++++++++++++++++++++
.../transport/netty/NettyEventSenderFactory.java | 10 ++++++
.../netty/channel/StandardChannelInitializer.java | 13 ++++++++
4 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 567f8fd..d9b8577 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -78,10 +78,11 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
- .description("The amount of time a connection should be held open without being used before closing the connection.")
+ .description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
.required(true)
- .defaultValue("5 seconds")
+ .defaultValue("15 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
// Putting these properties here so sub-classes don't have to redefine them, but they are
@@ -249,7 +250,9 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
factory.setShutdownQuietPeriod(Duration.ZERO); // Quiet period not necessary since sending threads will have completed before shutting down event sender
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int idleTimeout = context.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
+ factory.setIdleTimeout(Duration.ofSeconds(idleTimeout));
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
new file mode 100644
index 0000000..c452a32
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/CloseContextIdleStateHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+/**
+ * Idle State Handler closes channel context when state indicates idle communications
+ */
+public class CloseContextIdleStateHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
+ if (event instanceof IdleStateEvent) {
+ final IdleStateEvent idleStateEvent = (IdleStateEvent) event;
+ if (idleStateEvent.state() == IdleState.ALL_IDLE) {
+ context.close();
+ }
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index b2f34d4..9030297 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -61,6 +61,8 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
private Duration timeout = Duration.ofSeconds(30);
+ private Duration idleTimeout = Duration.ofSeconds(0);
+
private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
@@ -116,6 +118,13 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
}
/**
+ * Set the idle timeout period for outgoing client connections
+ */
+ public void setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = Objects.requireNonNull(idleTimeout, "Timeout required");
+ }
+
+ /**
* Set shutdown quiet period
*
* @param quietPeriod shutdown quiet period
@@ -205,6 +214,7 @@ public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements
? new StandardChannelInitializer<>(handlerSupplier)
: new ClientSslStandardChannelInitializer<>(handlerSupplier, sslContext);
channelInitializer.setWriteTimeout(timeout);
+ channelInitializer.setIdleTimeout(idleTimeout);
return channelInitializer;
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
index bbfb148..c3fcf81 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/StandardChannelInitializer.java
@@ -20,7 +20,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
import java.time.Duration;
import java.util.List;
@@ -37,6 +39,8 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
private Duration writeTimeout = Duration.ofSeconds(30);
+ private Duration idleTimeout = Duration.ofSeconds(0);
+
/**
* Standard Channel Initializer with handlers
*
@@ -55,10 +59,19 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
this.writeTimeout = Objects.requireNonNull(writeTimeout);
}
+ /**
+ * Set the idle timeout period for outgoing client connections
+ */
+ public void setIdleTimeout(final Duration idleTimeout) {
+ this.idleTimeout = Objects.requireNonNull(idleTimeout);
+ }
+
@Override
protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
+ pipeline.addLast(new CloseContextIdleStateHandler());
handlerSupplier.get().forEach(pipeline::addLast);
}
}