You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/12/18 22:34:41 UTC

nifi git commit: NIFI-1301: Ensure that when creating site-to-site connection, if remote instance is applying backpressure that we do not block indefinitely waiting for the connection to be made

Repository: nifi
Updated Branches:
  refs/heads/master 608287f9f -> bef3fc8b4


NIFI-1301: Ensure that when creating site-to-site connection, if remote instance is applying backpressure that we do not block indefinitely waiting for the connection to be made

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bef3fc8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bef3fc8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bef3fc8b

Branch: refs/heads/master
Commit: bef3fc8b40cec21fd7ce86fde3e812fd348f1341
Parents: 608287f
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 15:40:00 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Dec 18 16:08:41 2015 -0500

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   | 62 ++++++++++----------
 1 file changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bef3fc8b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 88a34aa..18075db 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -250,26 +250,26 @@ public class EndpointConnectionPool {
         EndpointConnection connection;
         Peer peer = null;
 
-        logger.debug("{} getting next peer status", this);
-        final PeerStatus peerStatus = getNextPeerStatus(direction);
-        logger.debug("{} next peer status = {}", this, peerStatus);
-        if (peerStatus == null) {
-            return null;
-        }
+        do {
+            final List<EndpointConnection> addBack = new ArrayList<>();
+            logger.debug("{} getting next peer status", this);
+            final PeerStatus peerStatus = getNextPeerStatus(direction);
+            logger.debug("{} next peer status = {}", this, peerStatus);
+            if (peerStatus == null) {
+                return null;
+            }
 
-        final PeerDescription peerDescription = peerStatus.getPeerDescription();
-        BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
-        if (connectionQueue == null) {
-            connectionQueue = new LinkedBlockingQueue<>();
-            BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
-            if (existing != null) {
-                connectionQueue = existing;
+            final PeerDescription peerDescription = peerStatus.getPeerDescription();
+            BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
+            if (connectionQueue == null) {
+                connectionQueue = new LinkedBlockingQueue<>();
+                BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
+                if (existing != null) {
+                    connectionQueue = existing;
+                }
             }
-        }
 
-        final List<EndpointConnection> addBack = new ArrayList<>();
-        try {
-            do {
+            try {
                 connection = connectionQueue.poll();
                 logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
                 final String portId = getPortIdentifier(direction);
@@ -387,21 +387,23 @@ public class EndpointConnectionPool {
                         protocol = connection.getSocketClientProtocol();
                     }
                 }
-            } while (connection == null || codec == null || commsSession == null || protocol == null);
-        } catch (final Throwable t) {
-            if (commsSession != null) {
-                try {
-                    commsSession.close();
-                } catch (final IOException ioe) {
+            } catch (final Throwable t) {
+                if (commsSession != null) {
+                    try {
+                        commsSession.close();
+                    } catch (final IOException ioe) {
+                    }
                 }
-            }
 
-            throw t;
-        } finally {
-            if (!addBack.isEmpty()) {
-                connectionQueue.addAll(addBack);
+                throw t;
+            } finally {
+                if (!addBack.isEmpty()) {
+                    connectionQueue.addAll(addBack);
+                    addBack.clear();
+                }
             }
-        }
+
+        } while (connection == null || codec == null || commsSession == null || protocol == null);
 
         activeConnections.add(connection);
         return connection;
@@ -773,7 +775,7 @@ public class EndpointConnectionPool {
         final StringBuilder distributionDescription = new StringBuilder();
         distributionDescription.append("New Weighted Distribution of Nodes:");
         for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / (double) destinations.size();
+            final double percentage = entry.getValue() * 100D / destinations.size();
             distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
         }
         logger.info(distributionDescription.toString());