You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/16 22:49:24 UTC

incubator-nifi git commit: NIFI-282: Fixed bug that caused load distribution across nodes in cluster not to work

Repository: incubator-nifi
Updated Branches:
  refs/heads/nifi-site-to-site-client 2f60ddc03 -> a7405b915


NIFI-282: Fixed bug that caused load distribution across nodes in cluster not to work


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a7405b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a7405b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a7405b91

Branch: refs/heads/nifi-site-to-site-client
Commit: a7405b915d059483135d706a7ce6a7b848b4c175
Parents: 2f60ddc
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 16 16:49:15 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 16 16:49:15 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/remote/Peer.java  |   8 +-
 .../org/apache/nifi/remote/PeerDescription.java |  79 ++++++++++++
 .../java/org/apache/nifi/remote/PeerStatus.java |  32 ++---
 .../client/socket/EndpointConnectionPool.java   | 127 ++++++++++---------
 .../protocol/socket/SocketClientProtocol.java   |   5 +-
 .../socket/TestEndpointConnectionStatePool.java |   8 +-
 .../client/socket/TestSiteToSiteClient.java     |  41 +++---
 .../nifi/remote/SocketRemoteSiteListener.java   |   3 +-
 8 files changed, 198 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 3534f95..2428078 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -25,6 +25,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
 
 public class Peer implements Communicant {
 
+    private final PeerDescription description;
     private final CommunicationsSession commsSession;
     private final String url;
     private final String clusterUrl;
@@ -34,7 +35,8 @@ public class Peer implements Communicant {
     private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
     private boolean closed = false;
 
-    public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+    public Peer(final PeerDescription description, final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+        this.description = description;
         this.commsSession = commsSession;
         this.url = peerUrl;
         this.clusterUrl = clusterUrl;
@@ -48,6 +50,10 @@ public class Peer implements Communicant {
         }
     }
 
+    public PeerDescription getDescription() {
+        return description;
+    }
+    
     @Override
     public String getUrl() {
         return url;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
new file mode 100644
index 0000000..0e8e498
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerDescription {
+    private final String hostname;
+    private final int port;
+    private final boolean secure;
+    
+    public PeerDescription(final String hostname, final int port, final boolean secure) {
+        this.hostname = hostname;
+        this.port = port;
+        this.secure = secure;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public boolean isSecure() {
+        return secure;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerDescription[hostname=" + hostname + ", port=" + port + ", secure=" + secure + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((hostname == null) ? 0 : hostname.hashCode());
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        
+        final PeerDescription other = (PeerDescription) obj;
+        if (hostname == null) {
+            if (other.hostname != null) {
+                return false;
+            }
+        } else if (!hostname.equals(other.hostname)) {
+            return false;
+        }
+        
+        return port == other.port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
index d1cb076..b68ac33 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -17,43 +17,31 @@
 package org.apache.nifi.remote;
 
 public class PeerStatus {
-
-    private final String hostname;
-    private final int port;
-    private final boolean secure;
+    private final PeerDescription description;
     private final int numFlowFiles;
 
-    public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
-        this.hostname = hostname;
-        this.port = port;
-        this.secure = secure;
+    public PeerStatus(final PeerDescription description, final int numFlowFiles) {
+        this.description = description;
         this.numFlowFiles = numFlowFiles;
     }
 
-    public String getHostname() {
-        return hostname;
-    }
-
-    public int getPort() {
-        return port;
+    public PeerDescription getPeerDescription() {
+        return description;
     }
-
-    public boolean isSecure() {
-        return secure;
-    }
-
+    
     public int getFlowFileCount() {
         return numFlowFiles;
     }
 
     @Override
     public String toString() {
-        return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+        return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() + 
+                ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]";
     }
 
     @Override
     public int hashCode() {
-        return 9824372 + hostname.hashCode() + port;
+        return 9824372 + description.getHostname().hashCode() + description.getPort() * 41;
     }
 
     @Override
@@ -67,6 +55,6 @@ public class PeerStatus {
         }
 
         final PeerStatus other = (PeerStatus) obj;
-        return port == other.port && hostname.equals(other.hostname);
+        return description.equals(other.getPeerDescription());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index f9a8a38..450daec 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -61,6 +61,7 @@ import javax.security.cert.CertificateNotYetValidException;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
@@ -97,8 +98,8 @@ public class EndpointConnectionPool {
 
 	private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
 	
-	private final BlockingQueue<EndpointConnection> connectionQueue = new LinkedBlockingQueue<>();
-    private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
+	private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
     private final URI clusterUrl;
     private final String apiUri;
     
@@ -227,6 +228,23 @@ public class EndpointConnectionPool {
         SocketClientProtocol protocol = null;
         EndpointConnection connection;
         Peer peer = null;
+
+        logger.debug("{} getting next peer status", this);
+        final PeerStatus peerStatus = getNextPeerStatus(direction);
+        logger.debug("{} next peer status = {}", this, peerStatus);
+        if ( peerStatus == null ) {
+            return null;
+        }
+
+        final PeerDescription peerDescription = peerStatus.getPeerDescription();
+        BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peerStatus);
+        if ( connectionQueue == null ) {
+            connectionQueue = new LinkedBlockingQueue<>();
+            BlockingQueue<EndpointConnection> existing = connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
+            if ( existing != null ) {
+                connectionQueue = existing;
+            }
+        }
         
         final List<EndpointConnection> addBack = new ArrayList<>();
         try {
@@ -254,19 +272,12 @@ public class EndpointConnectionPool {
                     protocol = new SocketClientProtocol();
                     protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
 
-                    logger.debug("{} getting next peer status", this);
-                    final PeerStatus peerStatus = getNextPeerStatus(direction);
-                    logger.debug("{} next peer status = {}", this, peerStatus);
-                    if ( peerStatus == null ) {
-                        return null;
-                    }
-
                     final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                     try {
                         logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
                         commsSession = establishSiteToSiteConnection(peerStatus);
                     } catch (final IOException ioe) {
-                        penalize(peerStatus, penalizationMillis);
+                        penalize(peerStatus.getPeerDescription(), penalizationMillis);
                         throw ioe;
                     }
                     
@@ -283,8 +294,8 @@ public class EndpointConnectionPool {
                         }
                     }
                 
-                    final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
-                    peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+                    final String peerUrl = "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort();
+                    peer = new Peer(peerDescription, commsSession, peerUrl, clusterUrl.toString());
     
                     // set properties based on config
                     if ( config != null ) {
@@ -371,6 +382,11 @@ public class EndpointConnectionPool {
     		return false;
     	}
     	
+    	final BlockingQueue<EndpointConnection> connectionQueue = connectionQueueMap.get(peer.getDescription());
+    	if ( connectionQueue == null ) {
+    	    return false;
+    	}
+    	
     	activeConnections.remove(endpointConnection);
     	if ( shutdown ) {
     	    terminate(endpointConnection);
@@ -381,14 +397,14 @@ public class EndpointConnectionPool {
     	}
     }
     
-    private void penalize(final PeerStatus status, final long penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(status);
+    private void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
+        Long expiration = peerTimeoutExpirations.get(peerDescription);
         if ( expiration == null ) {
             expiration = Long.valueOf(0L);
         }
         
         final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
+        peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
     }
     
     /**
@@ -396,19 +412,7 @@ public class EndpointConnectionPool {
      * @param peer
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
-        String host;
-        int port;
-        try {
-            final URI uri = new URI(peer.getUrl());
-            host = uri.getHost();
-            port = uri.getPort();
-        } catch (final URISyntaxException e) {
-            host = peer.getHost();
-            port = -1;
-        }
-        
-        final PeerStatus status = new PeerStatus(host, port, true, 1);
-        penalize(status, penalizationMillis);
+        penalize(peer.getDescription(), penalizationMillis);
     }
     
     private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
@@ -509,7 +513,8 @@ public class EndpointConnectionPool {
         final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
         final List<NodeInformation> nodeInfos = new ArrayList<>();
         for ( final PeerStatus peerStatus : statuses ) {
-            final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
+            final PeerDescription description = peerStatus.getPeerDescription();
+            final NodeInformation nodeInfo = new NodeInformation(description.getHostname(), description.getPort(), 0, description.isSecure(), peerStatus.getFlowFileCount());
             nodeInfos.add(nodeInfo);
         }
         clusterNodeInfo.setNodeInformation(nodeInfos);
@@ -526,7 +531,7 @@ public class EndpointConnectionPool {
         if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
             final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
             for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
+                final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
                 equalizedSet.add(equalizedStatus);
             }
 
@@ -543,8 +548,9 @@ public class EndpointConnectionPool {
             throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
         }
     	
+        final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
     	final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
-        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
+        final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
         final SocketClientProtocol clientProtocol = new SocketClientProtocol();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
@@ -602,7 +608,8 @@ public class EndpointConnectionPool {
              final OutputStream out = new BufferedOutputStream(fos)) {
 
             for (final PeerStatus status : statuses) {
-                final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
+                final PeerDescription description = status.getPeerDescription();
+                final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
                 out.write(line.getBytes(StandardCharsets.UTF_8));
             }
 
@@ -631,7 +638,7 @@ public class EndpointConnectionPool {
                 final int port = Integer.parseInt(splits[1]);
                 final boolean secure = Boolean.parseBoolean(splits[2]);
 
-                statuses.add(new PeerStatus(hostname, port, secure, 1));
+                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
             }
         }
 
@@ -640,7 +647,8 @@ public class EndpointConnectionPool {
     
     
     private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
-    	return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
+        final PeerDescription description = peerStatus.getPeerDescription();
+    	return establishSiteToSiteConnection(description.getHostname(), description.getPort());
     }
     
     private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
@@ -720,7 +728,8 @@ public class EndpointConnectionPool {
                     final int index = n % destinations.size();
                     PeerStatus status = destinations.get(index);
                     if ( status == null ) {
-                        status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
+                        final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
+                        status = new PeerStatus(description, nodeInfo.getTotalFlowFiles());
                         destinations.set(index, status);
                         break;
                     } else {
@@ -744,27 +753,29 @@ public class EndpointConnectionPool {
     
     
     private void cleanupExpiredSockets() {
-        final List<EndpointConnection> connections = new ArrayList<>();
-        
-        EndpointConnection connection;
-        while ((connection = connectionQueue.poll()) != null) {
-            // If the socket has not been used in 10 seconds, shut it down.
-            final long lastUsed = connection.getLastTimeUsed();
-            if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
-                try {
-                    connection.getSocketClientProtocol().shutdown(connection.getPeer());
-                } catch (final Exception e) {
-                    logger.debug("Failed to shut down {} using {} due to {}", 
-                        new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
+        for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
+            final List<EndpointConnection> connections = new ArrayList<>();
+            
+            EndpointConnection connection;
+            while ((connection = connectionQueue.poll()) != null) {
+                // If the socket has not been used in 10 seconds, shut it down.
+                final long lastUsed = connection.getLastTimeUsed();
+                if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
+                    try {
+                        connection.getSocketClientProtocol().shutdown(connection.getPeer());
+                    } catch (final Exception e) {
+                        logger.debug("Failed to shut down {} using {} due to {}", 
+                            new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
+                    }
+                    
+                    terminate(connection);
+                } else {
+                    connections.add(connection);
                 }
-                
-                terminate(connection);
-            } else {
-                connections.add(connection);
             }
+            
+            connectionQueue.addAll(connections);
         }
-        
-        connectionQueue.addAll(connections);
     }
     
     public void shutdown() {
@@ -775,10 +786,12 @@ public class EndpointConnectionPool {
        for ( final EndpointConnection conn : activeConnections ) {
            conn.getPeer().getCommunicationsSession().interrupt();
         }
-        
-        EndpointConnection state;
-        while ( (state = connectionQueue.poll()) != null)  {
-            cleanup(state.getSocketClientProtocol(), state.getPeer());
+
+        for ( final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values() ) {
+            EndpointConnection state;
+            while ( (state = connectionQueue.poll()) != null)  {
+                cleanup(state.getSocketClientProtocol(), state.getPeer());
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 390f4fc..c3275ea 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -34,6 +34,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
@@ -117,7 +118,7 @@ public class SocketClientProtocol implements ClientProtocol {
         properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
         
         if ( destinationId != null ) {
-        	properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
+        	properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
         }
         
         properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
@@ -229,7 +230,7 @@ public class SocketClientProtocol implements ClientProtocol {
             final int port = dis.readInt();
             final boolean secure = dis.readBoolean();
             final int flowFileCount = dis.readInt();
-            peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+            peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
         }
         
         logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index 275e40c..cb7af08 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -41,7 +41,7 @@ public class TestEndpointConnectionStatePool {
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+            System.out.println(peerStatus.getPeerDescription());
         }
     }
     
@@ -55,7 +55,7 @@ public class TestEndpointConnectionStatePool {
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+            System.out.println(peerStatus.getPeerDescription());
         }
     }
     
@@ -75,7 +75,7 @@ public class TestEndpointConnectionStatePool {
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+            System.out.println(peerStatus.getPeerDescription());
         }
     }
     
@@ -89,7 +89,7 @@ public class TestEndpointConnectionStatePool {
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+            System.out.println(peerStatus.getPeerDescription());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index bb16a34..0f48b03 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -43,26 +43,31 @@ public class TestSiteToSiteClient {
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
             .url("http://localhost:8080/nifi")
             .portName("cba")
-            .requestBatchCount(1)
+            .requestBatchCount(10)
             .build();
         
         try {
-            final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-            Assert.assertNotNull(transaction);
-            
-            final DataPacket packet = transaction.receive();
-            Assert.assertNotNull(packet);
-            
-            final InputStream in = packet.getData();
-            final long size = packet.getSize();
-            final byte[] buff = new byte[(int) size];
-            
-            StreamUtils.fillBuffer(in, buff);
-            
-            Assert.assertNull(transaction.receive());
-            
-            transaction.confirm();
-            transaction.complete();
+            for (int i=0; i < 1000; i++) {
+                final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+                Assert.assertNotNull(transaction);
+                
+                DataPacket packet;
+                while (true) {
+                    packet = transaction.receive();
+                    if ( packet == null ) {
+                        break;
+                    }
+
+                    final InputStream in = packet.getData();
+                    final long size = packet.getSize();
+                    final byte[] buff = new byte[(int) size];
+                    
+                    StreamUtils.fillBuffer(in, buff);
+                }
+                
+                transaction.confirm();
+                transaction.complete();
+            }
         } finally {
             client.close();
         }
@@ -70,7 +75,7 @@ public class TestSiteToSiteClient {
     
     
     @Test
-    //@Ignore("For local testing only; not really a unit test but a manual test")
+    @Ignore("For local testing only; not really a unit test but a manual test")
     public void testSend() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a7405b91/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 8a4839b..493d1fe 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -206,7 +206,8 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                             	protocol.setRootProcessGroup(rootGroup.get());
                           	    protocol.setNodeInformant(nodeInformant);
                             	
-                            	peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
+                          	    final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
+                            	peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
                             	LOG.debug("Handshaking....");
                             	protocol.handshake(peer);