You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/12/18 22:34:41 UTC
nifi git commit: NIFI-1301: Ensure that when creating site-to-site
connection,
if remote instance is applying backpressure that we do not block indefinitely
waiting for the connection to be made
Repository: nifi
Updated Branches:
refs/heads/master 608287f9f -> bef3fc8b4
NIFI-1301: Ensure that when creating site-to-site connection, if remote instance is applying backpressure that we do not block indefinitely waiting for the connection to be made
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bef3fc8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bef3fc8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bef3fc8b
Branch: refs/heads/master
Commit: bef3fc8b40cec21fd7ce86fde3e812fd348f1341
Parents: 608287f
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 15:40:00 2015 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Dec 18 16:08:41 2015 -0500
----------------------------------------------------------------------
.../client/socket/EndpointConnectionPool.java | 62 ++++++++++----------
1 file changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/bef3fc8b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 88a34aa..18075db 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -250,26 +250,26 @@ public class EndpointConnectionPool {
EndpointConnection connection;
Peer peer = null;
- logger.debug("{} getting next peer status", this);
- final PeerStatus peerStatus = getNextPeerStatus(direction);
- logger.debug("{} next peer status = {}", this, peerStatus);
- if (peerStatus == null) {
- return null;
- }
+ do {
+ final List<EndpointConnection> addBack = new ArrayList<>();
+ logger.debug("{} getting next peer status", this);
+ final PeerStatus peerStatus = getNextPeerStatus(direction);
+ logger.debug("{} next peer status = {}", this, peerStatus);
+ if (peerStatus == null) {
+ return null;
+ }
- final PeerDescription peerDescription = peerStatus.getPeerDescription();
- BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
- if (connectionQueue == null) {
- connectionQueue = new LinkedBlockingQueue<>();
- BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
- if (existing != null) {
- connectionQueue = existing;
+ final PeerDescription peerDescription = peerStatus.getPeerDescription();
+ BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerDescription);
+ if (connectionQueue == null) {
+ connectionQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
+ if (existing != null) {
+ connectionQueue = existing;
+ }
}
- }
- final List<EndpointConnection> addBack = new ArrayList<>();
- try {
- do {
+ try {
connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
final String portId = getPortIdentifier(direction);
@@ -387,21 +387,23 @@ public class EndpointConnectionPool {
protocol = connection.getSocketClientProtocol();
}
}
- } while (connection == null || codec == null || commsSession == null || protocol == null);
- } catch (final Throwable t) {
- if (commsSession != null) {
- try {
- commsSession.close();
- } catch (final IOException ioe) {
+ } catch (final Throwable t) {
+ if (commsSession != null) {
+ try {
+ commsSession.close();
+ } catch (final IOException ioe) {
+ }
}
- }
- throw t;
- } finally {
- if (!addBack.isEmpty()) {
- connectionQueue.addAll(addBack);
+ throw t;
+ } finally {
+ if (!addBack.isEmpty()) {
+ connectionQueue.addAll(addBack);
+ addBack.clear();
+ }
}
- }
+
+ } while (connection == null || codec == null || commsSession == null || protocol == null);
activeConnections.add(connection);
return connection;
@@ -773,7 +775,7 @@ public class EndpointConnectionPool {
final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:");
for (final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet()) {
- final double percentage = entry.getValue() * 100D / (double) destinations.size();
+ final double percentage = entry.getValue() * 100D / destinations.size();
distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
}
logger.info(distributionDescription.toString());