You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:50 UTC
[47/51] [abbrv] incubator-nifi git commit: NIFI-383: Ensure that we
always clean up sockets
NIFI-383: Ensure that we always clean up sockets
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ca23ad8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ca23ad8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ca23ad8e
Branch: refs/heads/NIFI-353
Commit: ca23ad8eaa236407daff926d49c73985ed16a41e
Parents: 5aef55b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 25 14:06:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 25 14:06:58 2015 -0500
----------------------------------------------------------------------
.../client/socket/EndpointConnectionPool.java | 5 ----
.../SocketChannelCommunicationsSession.java | 22 +++++++++++++-
.../SSLSocketChannelCommunicationsSession.java | 22 +++++++++++++-
.../io/socket/SocketChannelInputStream.java | 2 ++
.../remote/io/socket/ssl/SSLSocketChannel.java | 2 ++
.../nifi/remote/StandardRemoteGroupPort.java | 30 +++++++++++++-------
6 files changed, 66 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 42428f6..885f357 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -377,11 +377,6 @@ public class EndpointConnectionPool {
return false;
}
- final String url = peer.getUrl();
- if ( url == null ) {
- return false;
- }
-
final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
if ( connectionQueue == null ) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
index 0822b6a..8065f57 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -64,7 +64,27 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
@Override
public void close() throws IOException {
- channel.close();
+ IOException suppressed = null;
+
+ try {
+ request.consume();
+ } catch (final IOException ioe) {
+ suppressed = ioe;
+ }
+
+ try {
+ channel.close();
+ } catch (final IOException ioe) {
+ if ( suppressed != null ) {
+ ioe.addSuppressed(suppressed);
+ }
+
+ throw ioe;
+ }
+
+ if ( suppressed != null ) {
+ throw suppressed;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
index dca1d84..50e9162 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -54,7 +54,27 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
@Override
public void close() throws IOException {
- channel.close();
+ IOException suppressed = null;
+
+ try {
+ request.consume();
+ } catch (final IOException ioe) {
+ suppressed = ioe;
+ }
+
+ try {
+ channel.close();
+ } catch (final IOException ioe) {
+ if ( suppressed != null ) {
+ ioe.addSuppressed(suppressed);
+ }
+
+ throw ioe;
+ }
+
+ if ( suppressed != null ) {
+ throw suppressed;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index f68c874..0ad0b74 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -45,6 +45,8 @@ public class SocketChannelInputStream extends InputStream {
}
public void consume() throws IOException {
+ channel.shutdownInput();
+
final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b);
int bytesRead;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
index 7c74b20..249ad48 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -259,6 +259,8 @@ public class SSLSocketChannel implements Closeable {
}
public void consume() throws IOException {
+ channel.shutdownInput();
+
final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b);
int readCount;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ca23ad8e/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index da9d027..740e405 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -150,6 +150,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
+ // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
+ // we don't want to create a transaction at all.
+ final FlowFile firstFlowFile;
+ if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+ firstFlowFile = session.get();
+ if ( firstFlowFile == null ) {
+ return;
+ }
+ } else {
+ firstFlowFile = null;
+ }
+
final SiteToSiteClient client = clientRef.get();
final Transaction transaction;
try {
@@ -187,7 +199,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
try {
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
- transferFlowFiles(transaction, context, session);
+ transferFlowFiles(transaction, context, session, firstFlowFile);
} else {
final int numReceived = receiveFlowFiles(transaction, context, session);
if ( numReceived == 0 ) {
@@ -196,14 +208,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
session.commit();
- } catch (final Exception e) {
- final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
- logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
+ } catch (final Throwable t) {
+ final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
+ logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
if ( logger.isDebugEnabled() ) {
- logger.error("", e);
+ logger.error("", t);
}
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
+ transaction.error();
session.rollback();
}
}
@@ -216,11 +229,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return 0;
- }
+ private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
+ FlowFile flowFile = firstFlowFile;
try {
final String userDn = transaction.getCommunicant().getDistinguishedName();