You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/05/23 06:43:45 UTC

nifi git commit: NIFI-3732 Adding connect with timeout to StandardCommsSession and SSLCommsSession to avoid blocking

Repository: nifi
Updated Branches:
  refs/heads/master ded396f0e -> a8de27e69


NIFI-3732 Adding connect with timeout to StandardCommsSession and SSLCommsSession to avoid blocking

This closes #1842.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a8de27e6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a8de27e6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a8de27e6

Branch: refs/heads/master
Commit: a8de27e69b8c8cf553b7e1d44699a1f68e53c5bd
Parents: ded396f
Author: Bryan Bende <bb...@apache.org>
Authored: Mon May 22 20:51:04 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue May 23 15:43:27 2017 +0900

----------------------------------------------------------------------
 .../cache/client/DistributedMapCacheClientService.java  | 12 +++++++++---
 .../cache/client/DistributedSetCacheClientService.java  | 12 +++++++++---
 .../nifi/distributed/cache/client/SSLCommsSession.java  | 10 ++++++++--
 .../distributed/cache/client/StandardCommsSession.java  |  6 ++++--
 4 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 81013f6..f197bac 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -100,6 +101,11 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         this.configContext = context;
     }
 
+    @OnStopped
+    public void onStopped() throws IOException {
+        close();
+    }
+
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
         return withCommsSession(new CommsAction<Boolean>() {
@@ -292,14 +298,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
         final String hostname = context.getProperty(HOSTNAME).getValue();
         final int port = context.getProperty(PORT).asInteger();
-        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
         final CommsSession commsSession;
         if (sslContextService == null) {
-            commsSession = new StandardCommsSession(hostname, port);
+            commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
         } else {
-            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
         }
 
         commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index c1fa274..34a0a7c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -100,17 +101,22 @@ public class DistributedSetCacheClientService extends AbstractControllerService
         this.configContext = context;
     }
 
+    @OnStopped
+    public void onStopped() throws IOException {
+        close();
+    }
+
     public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
         final String hostname = context.getProperty(HOSTNAME).getValue();
         final int port = context.getProperty(PORT).asInteger();
-        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
         final CommsSession commsSession;
         if (sslContextService == null) {
-            commsSession = new StandardCommsSession(hostname, port);
+            commsSession = new StandardCommsSession(hostname, port, timeoutMillis);
         } else {
-            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis);
         }
 
         commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
index 7808d21..18ca571 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -19,6 +19,8 @@ package org.apache.nifi.distributed.cache.client;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
 import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
@@ -44,8 +46,12 @@ public class SSLCommsSession implements CommsSession {
 
     private int protocolVersion;
 
-    public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
-        sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true);
+    public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port, final int timeoutMillis) throws IOException {
+        final SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
+        socketChannel.configureBlocking(false);
+
+        sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel,true);
 
         in = new SSLSocketChannelInputStream(sslSocketChannel);
         bufferedIn = new BufferedInputStream(in);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index 6a8ee45..7545bef 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -47,9 +47,11 @@ public class StandardCommsSession implements CommsSession {
 
     private int protocolVersion;
 
-    public StandardCommsSession(final String hostname, final int port) throws IOException {
-        socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+    public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException {
+        socketChannel = SocketChannel.open();
+        socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis);
         socketChannel.configureBlocking(false);
+
         in = new SocketChannelInputStream(socketChannel);
         bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));