You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/05/23 06:43:45 UTC
nifi git commit: NIFI-3732 Adding connect with timeout to
StandardCommsSession and SSLCommsSession to avoid blocking
Repository: nifi
Updated Branches:
refs/heads/master ded396f0e -> a8de27e69
NIFI-3732 Adding connect with timeout to StandardCommsSession and SSLCommsSession to avoid blocking
This closes #1842.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a8de27e6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a8de27e6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a8de27e6
Branch: refs/heads/master
Commit: a8de27e69b8c8cf553b7e1d44699a1f68e53c5bd
Parents: ded396f
Author: Bryan Bende <bb...@apache.org>
Authored: Mon May 22 20:51:04 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue May 23 15:43:27 2017 +0900
----------------------------------------------------------------------
.../cache/client/DistributedMapCacheClientService.java | 12 +++++++++---
.../cache/client/DistributedSetCacheClientService.java | 12 +++++++++---
.../nifi/distributed/cache/client/SSLCommsSession.java | 10 ++++++++--
.../distributed/cache/client/StandardCommsSession.java | 6 ++++--
4 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 81013f6..f197bac 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -100,6 +101,11 @@ public class DistributedMapCacheClientService extends AbstractControllerService
this.configContext = context;
}
+ @OnStopped
+ public void onStopped() throws IOException {
+ close();
+ }
+
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
@@ -292,14 +298,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService
public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
final String hostname = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
- final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final CommsSession commsSession;
if (sslContextService == null) {
- commsSession = new StandardCommsSession(hostname, port);
+ commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
} else {
- commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
}
commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index c1fa274..34a0a7c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -100,17 +101,22 @@ public class DistributedSetCacheClientService extends AbstractControllerService
this.configContext = context;
}
+ @OnStopped
+ public void onStopped() throws IOException {
+ close();
+ }
+
public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
final String hostname = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
- final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final CommsSession commsSession;
if (sslContextService == null) {
- commsSession = new StandardCommsSession(hostname, port);
+ commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
} else {
- commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
}
commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
index 7808d21..18ca571 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -19,6 +19,8 @@ package org.apache.nifi.distributed.cache.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -44,8 +46,12 @@ public class SSLCommsSession implements CommsSession {
private int protocolVersion;
- public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
- sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true);
+ public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port, final int timeoutMillis) throws IOException {
+ final SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
+ socketChannel.configureBlocking(false);
+
+ sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel,true);
in = new SSLSocketChannelInputStream(sslSocketChannel);
bufferedIn = new BufferedInputStream(in);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index 6a8ee45..7545bef 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -47,9 +47,11 @@ public class StandardCommsSession implements CommsSession {
private int protocolVersion;
- public StandardCommsSession(final String hostname, final int port) throws IOException {
- socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException {
+ socketChannel = SocketChannel.open();
+ socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
socketChannel.configureBlocking(false);
+
in = new SocketChannelInputStream(socketChannel);
bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));