You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:50 UTC

[47/51] [abbrv] incubator-nifi git commit: NIFI-383: Ensure that we always clean up sockets

NIFI-383: Ensure that we always clean up sockets


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ca23ad8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ca23ad8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ca23ad8e

Branch: refs/heads/NIFI-353
Commit: ca23ad8eaa236407daff926d49c73985ed16a41e
Parents: 5aef55b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 25 14:06:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 25 14:06:58 2015 -0500

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   |  5 ----
 .../SocketChannelCommunicationsSession.java     | 22 +++++++++++++-
 .../SSLSocketChannelCommunicationsSession.java  | 22 +++++++++++++-
 .../io/socket/SocketChannelInputStream.java     |  2 ++
 .../remote/io/socket/ssl/SSLSocketChannel.java  |  2 ++
 .../nifi/remote/StandardRemoteGroupPort.java    | 30 +++++++++++++-------
 6 files changed, 66 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 42428f6..885f357 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -377,11 +377,6 @@ public class EndpointConnectionPool {
     		return false;
     	}
     	
-    	final String url = peer.getUrl();
-    	if ( url == null ) {
-    		return false;
-    	}
-    	
     	final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
     	if ( connectionQueue == null ) {
     	    return false;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
index 0822b6a..8065f57 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -64,7 +64,27 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
 
     @Override
     public void close() throws IOException {
-        channel.close();
+        IOException suppressed = null;
+        
+        try {
+            request.consume();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        try {
+            channel.close();
+        } catch (final IOException ioe) {
+            if ( suppressed != null ) {
+                ioe.addSuppressed(suppressed);
+            }
+            
+            throw ioe;
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
+        }
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
index dca1d84..50e9162 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -54,7 +54,27 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
 
     @Override
     public void close() throws IOException {
-        channel.close();
+        IOException suppressed = null;
+        
+        try {
+            request.consume();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        try {
+            channel.close();
+        } catch (final IOException ioe) {
+            if ( suppressed != null ) {
+                ioe.addSuppressed(suppressed);
+            }
+            
+            throw ioe;
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
+        }
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index f68c874..0ad0b74 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -45,6 +45,8 @@ public class SocketChannelInputStream extends InputStream {
     }
     
     public void consume() throws IOException {
+        channel.shutdownInput();
+        
         final byte[] b = new byte[4096];
         final ByteBuffer buffer = ByteBuffer.wrap(b);
         int bytesRead;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
index 7c74b20..249ad48 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -259,6 +259,8 @@ public class SSLSocketChannel implements Closeable {
     }
 
     public void consume() throws IOException {
+        channel.shutdownInput();
+        
         final byte[] b = new byte[4096];
         final ByteBuffer buffer = ByteBuffer.wrap(b);
         int readCount;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index da9d027..740e405 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -150,6 +150,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         String url = getRemoteProcessGroup().getTargetUri().toString();
         
+        // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
+        // we don't want to create a transaction at all.
+        final FlowFile firstFlowFile;
+        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+            firstFlowFile = session.get();
+            if ( firstFlowFile == null ) {
+                return;
+            }
+        } else {
+            firstFlowFile = null;
+        }
+        
         final SiteToSiteClient client = clientRef.get();
         final Transaction transaction;
         try {
@@ -187,7 +199,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
         try {
             if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
-                transferFlowFiles(transaction, context, session);
+                transferFlowFiles(transaction, context, session, firstFlowFile);
             } else {
                 final int numReceived = receiveFlowFiles(transaction, context, session);
                 if ( numReceived == 0 ) {
@@ -196,14 +208,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             }
 
             session.commit();
-        } catch (final Exception e) {
-            final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
-            logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
+        } catch (final Throwable t) {
+            final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
+            logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
             if ( logger.isDebugEnabled() ) {
-                logger.error("", e);
+                logger.error("", t);
             }
             
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
+            transaction.error();
             session.rollback();
         }
     }
@@ -216,11 +229,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     }
     
     
-    private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return 0;
-        }
+    private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
+        FlowFile flowFile = firstFlowFile;
 
         try {
             final String userDn = transaction.getCommunicant().getDistinguishedName();