You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:57:06 UTC

[13/29] incubator-nifi git commit: Refactored client and add javadocs

Refactored client and add javadocs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d1e058cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d1e058cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d1e058cd

Branch: refs/heads/develop
Commit: d1e058cde7b011a4daa0d574d392569460fc70ba
Parents: 4ab5c30
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 08:16:55 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 08:16:55 2015 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/remote/Communicant.java     |  47 +++
 .../main/java/org/apache/nifi/remote/Peer.java  |  19 +-
 .../nifi/remote/RemoteResourceInitiator.java    |   9 +
 .../org/apache/nifi/remote/Transaction.java     |  27 +-
 .../nifi/remote/TransactionCompletion.java      |  63 +++
 .../nifi/remote/client/SiteToSiteClient.java    |  31 +-
 .../remote/client/SiteToSiteClientConfig.java   |  21 +-
 .../client/socket/EndpointConnectionPool.java   | 113 ++++--
 .../nifi/remote/client/socket/SocketClient.java |  20 +-
 .../protocol/socket/SocketClientProtocol.java   |   7 +-
 .../socket/SocketClientTransaction.java         | 401 ++++++++++---------
 .../SocketClientTransactionCompletion.java      |  57 +++
 .../client/socket/TestSiteToSiteClient.java     |  16 +-
 .../nifi/remote/SocketRemoteSiteListener.java   |   8 +-
 .../nifi/remote/StandardRemoteGroupPort.java    | 263 +++++++-----
 15 files changed, 720 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
new file mode 100644
index 0000000..ac2d498
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ * Represents the remote entity that the client is communicating with
+ */
+public interface Communicant {
+    /**
+     * Returns the NiFi site-to-site URL for the remote NiFi instance
+     * @return
+     */
+    String getUrl();
+    
+    /**
+     * The Host of the remote NiFi instance
+     * @return
+     */
+    String getHost();
+    
+    /**
+     * The Port that the remote NiFi instance is listening on for site-to-site communications
+     * @return
+     */
+    int getPort();
+    
+    /**
+     * The distinguished name that the remote NiFi instance has provided in its certificate if
+     * using secure communications, or <code>null</code> if the Distinguished Name is unknown
+     * @return
+     */
+    String getDistinguishedName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index dda5ae3..3534f95 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -23,12 +23,13 @@ import java.util.Map;
 
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 
-public class Peer {
+public class Peer implements Communicant {
 
     private final CommunicationsSession commsSession;
     private final String url;
     private final String clusterUrl;
     private final String host;
+    private final int port;
     
     private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
     private boolean closed = false;
@@ -39,12 +40,15 @@ public class Peer {
         this.clusterUrl = clusterUrl;
 
         try {
-            this.host = new URI(peerUrl).getHost();
+            final URI uri = new URI(peerUrl);
+            this.port = uri.getPort();
+            this.host = uri.getHost();
         } catch (final Exception e) {
             throw new IllegalArgumentException("Invalid URL: " + peerUrl);
         }
     }
 
+    @Override
     public String getUrl() {
         return url;
     }
@@ -92,6 +96,7 @@ public class Peer {
         return closed;
     }
 
+    @Override
     public String getHost() {
         return host;
     }
@@ -127,4 +132,14 @@ public class Peer {
         sb.append("]");
         return sb.toString();
     }
+
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public String getDistinguishedName() {
+        return commsSession.getUserDn();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
index 8eb5d8d..f469724 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -21,26 +21,33 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.nifi.remote.exception.HandshakeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteResourceInitiator {
 	public static final int RESOURCE_OK = 20;
 	public static final int DIFFERENT_RESOURCE_VERSION = 21;
 	public static final int ABORT = 255;
 
+	private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class);
 	
 	public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
         // Write the classname of the RemoteStreamCodec, followed by its version
+	    logger.debug("Negotiating resource; proposal is {}", resource);
     	dos.writeUTF(resource.getResourceName());
     	final VersionNegotiator negotiator = resource.getVersionNegotiator();
     	dos.writeInt(negotiator.getVersion());
     	dos.flush();
         
         // wait for response from server.
+    	logger.debug("Receiving response from remote instance");
         final int statusCode = dis.read();
         switch (statusCode) {
             case RESOURCE_OK:	// server accepted our proposal of codec name/version
+                logger.debug("Response was RESOURCE_OK");
                 return resource;
             case DIFFERENT_RESOURCE_VERSION:	// server accepted our proposal of codec name but not the version
+                logger.debug("Response was DIFFERENT_RESOURCE_VERSION");
                 // Get server's preferred version
             	final int newVersion = dis.readInt();
                 
@@ -56,8 +63,10 @@ public class RemoteResourceInitiator {
                 // Attempt negotiation of resource based on our new preferred version.
                 return initiateResourceNegotiation(resource, dis, dos);
             case ABORT:
+                logger.debug("Response was ABORT");
             	throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
             default:
+                logger.debug("Response was {}; unable to negotiate codec", statusCode);
                 return null;	// Unable to negotiate codec
         }
 	}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 9fb6147..51bf244 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -121,30 +121,16 @@ public interface Transaction {
 	void confirm() throws IOException;
 	
 	/**
-	 * <p>
-	 * Completes the transaction and indicates to both the sender and receiver that the data transfer was
-	 * successful. If receiving data, this method can also optionally request that the sender back off sending
-	 * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
-	 * that the receiver is not ready to receive data and made not service another request in the short term.
-	 * </p>
-	 * 
-	 * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
-	 * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
-	 * this Transaction is SEND, then this argument is ignored.
-	 * 
-	 * @throws IOException
-	 */
-	void complete(boolean requestBackoff) throws IOException;
-	
-	/**
      * <p>
      * Completes the transaction and indicates to both the sender and receiver that the data transfer was
      * successful.
      * </p>
      * 
      * @throws IOException
+     * 
+     * @return a TransactionCompletion that contains details about the Transaction
      */
-	void complete() throws IOException;
+	TransactionCompletion complete() throws IOException;
 	
 	/**
 	 * <p>
@@ -174,6 +160,13 @@ public interface Transaction {
 	 */
 	TransactionState getState() throws IOException;
 	
+	/**
+	 * Returns a Communicant that represents the other side of this Transaction (i.e.,
+	 * the remote NiFi instance)
+	 * @return
+	 */
+	Communicant getCommunicant();
+	
 	
 	public enum TransactionState {
 	    /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
new file mode 100644
index 0000000..be5f73a
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+
+/**
+ * A TransactionCompletion provides information about a {@link Transaction} that has completed successfully.
+ */
+public interface TransactionCompletion {
+    
+    /**
+     * When a sending to a NiFi instance, the server may accept the content sent to it
+     * but indicate that its queues are full and that the client should backoff sending
+     * data for a bit. This method returns <code>true</code> if the server did in fact
+     * request that, <code>false</code> otherwise.
+     * @return
+     */
+    boolean isBackoff();
+    
+    /**
+     * Returns the number of Data Packets that were sent to or received from the remote
+     * NiFi instance in the Transaction
+     * @return
+     */
+    int getDataPacketsTransferred();
+    
+    /**
+     * Returns the number of bytes of DataPacket content that were sent to or received from 
+     * the remote NiFI instance in the Transaction. Note that this is different than the number
+     * of bytes actually transferred between the client and server, as it does not take into
+     * account the attributes or protocol-specific information that is exchanged but rather
+     * takes into account only the data in the {@link InputStream} of the {@link DataPacket}
+     * @return
+     */
+    long getBytesTransferred();
+    
+    /**
+     * Returns the amount of time that the Transaction took, from the time that the Transaction
+     * was created to the time that the Transaction was completed.
+     * @param timeUnit
+     * @return
+     */
+    long getDuration(TimeUnit timeUnit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 47568fd..0591b5a 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -126,6 +126,7 @@ public interface SiteToSiteClient extends Closeable {
 		private String url;
 		private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
 		private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+		private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
 		private SSLContext sslContext;
 		private EventReporter eventReporter;
 		private File peerPersistenceFile;
@@ -163,6 +164,19 @@ public interface SiteToSiteClient extends Closeable {
 		}
 		
 		/**
+		 * Specifies the amount of time that a connection can remain idle in the connection pool before it
+		 * is "expired" and shutdown. The default value is 30 seconds.
+		 *  
+		 * @param timeout
+		 * @param unit
+		 * @return
+		 */
+		public Builder idleExpiration(final long timeout, final TimeUnit unit) {
+		    this.idleExpirationNanos = unit.toNanos(timeout);
+		    return this;
+		}
+		
+		/**
 		 * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
 		 * or the remote instance of NiFi if it is standalone), specifies how long the client should
 		 * wait before attempting to communicate with that node again. While a particular node is penalized,
@@ -327,6 +341,11 @@ public interface SiteToSiteClient extends Closeable {
 				}
 				
 				@Override
+				public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+				    return Builder.this.getIdleConnectionExpiration(timeUnit);
+				}
+				
+				@Override
 				public SSLContext getSslContext() {
 					return Builder.this.getSslContext();
 				}
@@ -384,12 +403,22 @@ public interface SiteToSiteClient extends Closeable {
 		}
 
 		/**
-		 * Returns the communications timeout in nanoseconds
+		 * Returns the communications timeout
 		 * @return
 		 */
 		public long getTimeout(final TimeUnit timeUnit) {
 			return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
 		}
+		
+		/**
+		 * Returns the amount of of time that a connection can remain idle in the connection
+		 * pool before being shutdown
+		 * @param timeUnit
+		 * @return
+		 */
+		public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+		    return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
+		}
 
 		/**
 		 * Returns the amount of time that a particular node will be ignored after a

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 37c48f8..d03ab3c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -37,6 +37,14 @@ public interface SiteToSiteClientConfig {
 	 * @return
 	 */
 	long getTimeout(final TimeUnit timeUnit);
+	
+	/**
+	 * Returns the amount of time that a connection can remain idle before it is
+	 * "expired" and shut down
+	 * @param timeUnit
+	 * @return
+	 */
+	long getIdleConnectionExpiration(TimeUnit timeUnit);
 
 	/**
 	 * Returns the amount of time that a particular node will be ignored after a
@@ -53,12 +61,6 @@ public interface SiteToSiteClientConfig {
 	SSLContext getSslContext();
 	
 	/**
-	 * Returns the EventReporter that is to be used by clients to report events
-	 * @return
-	 */
-	EventReporter getEventReporter();
-
-	/**
 	 * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
 	 * @return
 	 */
@@ -111,4 +113,11 @@ public interface SiteToSiteClientConfig {
      * @return
      */
 	int getPreferredBatchCount();
+	
+	/**
+     * Returns the EventReporter that is to be used by clients to report events
+     * @return
+     */
+    EventReporter getEventReporter();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 6869cca..43bc8e5 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -107,12 +107,13 @@ public class EndpointConnectionPool {
     private volatile List<PeerStatus> peerStatuses;
     private volatile long peerRefreshTime = 0L;
     private volatile PeerStatusCache peerStatusCache;
-    private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
+    private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<EndpointConnection>());
 
     private final File peersFile;
     private final EventReporter eventReporter;
     private final SSLContext sslContext;
     private final ScheduledExecutorService taskExecutor;
+    private final int idleExpirationMillis;
     
     private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
     private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@@ -124,12 +125,18 @@ public class EndpointConnectionPool {
     private final Map<String, String> outputPortMap = new HashMap<>();	// map output port name to identifier
     
     private volatile int commsTimeout;
-
-    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
-    	this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
+    private volatile boolean shutdown = false;
+    
+    
+    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis, 
+            final EventReporter eventReporter, final File persistenceFile) 
+    {
+    	this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
     }
     
-    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+    public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
+            final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) 
+    {
     	try {
     		this.clusterUrl = new URI(clusterUrl);
     	} catch (final URISyntaxException e) {
@@ -147,6 +154,7 @@ public class EndpointConnectionPool {
     	this.peersFile = persistenceFile;
     	this.eventReporter = eventReporter;
     	this.commsTimeout = commsTimeoutMillis;
+    	this.idleExpirationMillis = idleExpirationMillis;
     	
     	Set<PeerStatus> recoveredStatuses;
     	if ( persistenceFile != null && persistenceFile.exists() ) {
@@ -225,19 +233,21 @@ public class EndpointConnectionPool {
                 
                 // if we can't get an existing Connection, create one
                 if ( connection == null ) {
-                    logger.debug("No Connection available for Port {}; creating new Connection", remoteDestination.getIdentifier());
+                    logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier());
                     protocol = new SocketClientProtocol();
                     protocol.setDestination(remoteDestination);
 
+                    logger.debug("{} getting next peer status", this);
                     final PeerStatus peerStatus = getNextPeerStatus(direction);
+                    logger.debug("{} next peer status = {}", this, peerStatus);
                     if ( peerStatus == null ) {
                         return null;
                     }
 
                     try {
+                        logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
                         commsSession = establishSiteToSiteConnection(peerStatus);
                     } catch (final IOException ioe) {
-                        // TODO: penalize peer status
                         penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
                         throw ioe;
                     }
@@ -245,6 +255,7 @@ public class EndpointConnectionPool {
                     final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                     final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                     try {
+                        logger.debug("{} Negotiating protocol", this);
                         RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
                     } catch (final HandshakeException e) {
                         try {
@@ -267,6 +278,7 @@ public class EndpointConnectionPool {
                     
                     // perform handshake
                     try {
+                        logger.debug("{} performing handshake", this);
                         protocol.handshake(peer);
                         
                         // handle error cases
@@ -286,7 +298,9 @@ public class EndpointConnectionPool {
                         }
                         
                         // negotiate the FlowFileCodec to use
+                        logger.debug("{} negotiating codec", this);
                         codec = protocol.negotiateCodec(peer);
+                        logger.debug("{} negotiated codec is {}", this, codec);
                     } catch (final PortNotRunningException | UnknownPortException e) {
                     	throw e;
                     } catch (final Exception e) {
@@ -323,6 +337,7 @@ public class EndpointConnectionPool {
             }
         }
         
+        activeConnections.add(connection);
         return connection;
     }
     
@@ -338,7 +353,14 @@ public class EndpointConnectionPool {
     		return false;
     	}
     	
-    	return connectionQueue.offer(endpointConnection);
+    	activeConnections.remove(endpointConnection);
+    	if ( shutdown ) {
+    	    terminate(endpointConnection);
+    	    return false;
+    	} else {
+    	    endpointConnection.setLastTimeUsed();
+    	    return connectionQueue.offer(endpointConnection);
+    	}
     }
     
     private void penalize(final PeerStatus status, final long penalizationMillis) {
@@ -393,27 +415,36 @@ public class EndpointConnectionPool {
         }
     }
     
+    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
+        return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    }
+    
     private PeerStatus getNextPeerStatus(final TransferDirection direction) {
         List<PeerStatus> peerList = peerStatuses;
-        if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) ) {
+        if ( isPeerRefreshNeeded(peerList) ) {
             peerRefreshLock.lock();
             try {
-                try {
-                    peerList = createPeerStatusList(direction);
-                } catch (final Exception e) {
-                    final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                    logger.warn(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", e);
+                // now that we have the lock, check again that we need to refresh (because another thread
+                // could have been refreshing while we were waiting for the lock).
+                peerList = peerStatuses;
+                if (isPeerRefreshNeeded(peerList)) {
+                    try {
+                        peerList = createPeerStatusList(direction);
+                    } catch (final Exception e) {
+                        final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
+                        logger.warn(message);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", e);
+                        }
+                        
+                        if ( eventReporter != null ) {
+                        	eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+                        }
                     }
                     
-                    if ( eventReporter != null ) {
-                    	eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
-                    }
+                    this.peerStatuses = peerList;
+                    peerRefreshTime = System.currentTimeMillis();
                 }
-                
-                this.peerStatuses = peerList;
-                peerRefreshTime = System.currentTimeMillis();
             } finally {
                 peerRefreshLock.unlock();
             }
@@ -488,7 +519,10 @@ public class EndpointConnectionPool {
 
     private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
     	final String hostname = clusterUrl.getHost();
-        final int port = getSiteToSitePort();
+        final Integer port = getSiteToSitePort();
+        if ( port == null ) {
+            throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
+        }
     	
     	final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
         final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
@@ -667,7 +701,7 @@ public class EndpointConnectionPool {
         distributionDescription.append("New Weighted Distribution of Nodes:");
         for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
             final double percentage = entry.getValue() * 100D / (double) destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
+            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
         }
         logger.info(distributionDescription.toString());
 
@@ -677,35 +711,36 @@ public class EndpointConnectionPool {
     
     
     private void cleanupExpiredSockets() {
-        final List<EndpointConnection> states = new ArrayList<>();
+        final List<EndpointConnection> connections = new ArrayList<>();
         
-        EndpointConnection state;
-        while ((state = connectionQueue.poll()) != null) {
+        EndpointConnection connection;
+        while ((connection = connectionQueue.poll()) != null) {
             // If the socket has not been used in 10 seconds, shut it down.
-            final long lastUsed = state.getLastTimeUsed();
-            if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+            final long lastUsed = connection.getLastTimeUsed();
+            if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
                 try {
-                    state.getSocketClientProtocol().shutdown(state.getPeer());
+                    connection.getSocketClientProtocol().shutdown(connection.getPeer());
                 } catch (final Exception e) {
                     logger.debug("Failed to shut down {} using {} due to {}", 
-                        new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
+                        new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
                 }
                 
-                cleanup(state.getSocketClientProtocol(), state.getPeer());
+                terminate(connection);
             } else {
-                states.add(state);
+                connections.add(connection);
             }
         }
         
-        connectionQueue.addAll(states);
+        connectionQueue.addAll(connections);
     }
     
     public void shutdown() {
+        shutdown = true;
     	taskExecutor.shutdown();
     	peerTimeoutExpirations.clear();
-            
-        for ( final CommunicationsSession commsSession : activeCommsChannels ) {
-            commsSession.interrupt();
+        
+       for ( final EndpointConnection conn : activeConnections ) {
+           conn.getPeer().getCommunicationsSession().interrupt();
         }
         
         EndpointConnection state;
@@ -714,8 +749,8 @@ public class EndpointConnectionPool {
         }
     }
     
-    public void terminate(final EndpointConnection state) {
-        cleanup(state.getSocketClientProtocol(), state.getPeer());
+    public void terminate(final EndpointConnection connection) {
+        cleanup(connection.getSocketClientProtocol(), connection.getPeer());
     }
     
     private void refreshPeers() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 6fa934b..aae19b3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -19,8 +19,10 @@ package org.apache.nifi.remote.client.socket;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.remote.Communicant;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
@@ -40,7 +42,8 @@ public class SocketClient implements SiteToSiteClient {
 	private volatile String portIdentifier;
 	
 	public SocketClient(final SiteToSiteClientConfig config) {
-		pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), 
+		pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+		        (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
 				config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
 		
 		this.config = config;
@@ -130,14 +133,9 @@ public class SocketClient implements SiteToSiteClient {
 			}
 
 			@Override
-			public void complete() throws IOException {
-			    complete(false);
-			}
-			
-			@Override
-			public void complete(final boolean requestBackoff) throws IOException {
+			public TransactionCompletion complete() throws IOException {
 				try {
-					transaction.complete(requestBackoff);
+					return transaction.complete();
 				} finally {
 				    final EndpointConnection state = connectionStateRef.get();
 				    if ( state != null ) {
@@ -187,7 +185,11 @@ public class SocketClient implements SiteToSiteClient {
 			public TransactionState getState() throws IOException {
 				return transaction.getState();
 			}
-			
+
+			@Override
+			public Communicant getCommunicant() {
+			    return transaction.getCommunicant();
+			}
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index e321663..390f4fc 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -317,10 +317,7 @@ public class SocketClientProtocol implements ClientProtocol {
 		// Commit the session so that we have persisted the data
 		session.commit();
 
-		// We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
-		final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
-
-		transaction.complete(applyBackpressure);
+		transaction.complete();
 		logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
 
 		if ( !flowFilesReceived.isEmpty() ) {
@@ -397,7 +394,7 @@ public class SocketClientProtocol implements ClientProtocol {
 	        final String dataSize = FormatUtils.formatDataSize(bytesSent);
 	        
 	        session.commit();
-	        transaction.complete(false);
+	        transaction.complete();
 	        
 	        final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
 	        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index cf8f9b2..b2fffed 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -25,8 +25,10 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 
+import org.apache.nifi.remote.Communicant;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.ProtocolException;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
 public class SocketClientTransaction implements Transaction {
 	private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
 	
-	
+	private final long creationNanoTime = System.nanoTime();
 	private final CRC32 crc = new CRC32();
 	private final int protocolVersion;
 	private final FlowFileCodec codec;
@@ -54,6 +56,7 @@ public class SocketClientTransaction implements Transaction {
 	
 	private boolean dataAvailable = false;
 	private int transfers = 0;
+	private long contentBytes = 0;
 	private TransactionState state;
 	
 	SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, 
@@ -108,54 +111,59 @@ public class SocketClientTransaction implements Transaction {
 	@Override
 	public DataPacket receive() throws IOException {
 	    try {
-    		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-    			throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
-    		}
-    		
-        	if ( direction == TransferDirection.SEND ) {
-        	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
-        	}
-        	
-        	// if we already know there's no data, just return null
-        	if ( !dataAvailable ) {
-        	    return null;
-        	}
-    
-        	// if we have already received a packet, check if another is available.
-        	if ( transfers > 0 ) {
-        	    // Determine if Peer will send us data or has no data to send us
-                final Response dataAvailableCode = Response.read(dis);
-                switch (dataAvailableCode.getCode()) {
-                    case CONTINUE_TRANSACTION:
-                        logger.debug("{} {} Indicates Transaction should continue", this, peer);
-                        this.dataAvailable = true;
-                        break;
-                    case FINISH_TRANSACTION:
-                        logger.debug("{} {} Indicates Transaction should finish", peer);
-                        this.dataAvailable = false;
-                        break;
-                    default:
-                        throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+	        try {
+        		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+        			throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+        		}
+        		
+            	if ( direction == TransferDirection.SEND ) {
+            	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+            	}
+            	
+            	// if we already know there's no data, just return null
+            	if ( !dataAvailable ) {
+            	    return null;
+            	}
+        
+            	// if we have already received a packet, check if another is available.
+            	if ( transfers > 0 ) {
+            	    // Determine if Peer will send us data or has no data to send us
+                    final Response dataAvailableCode = Response.read(dis);
+                    switch (dataAvailableCode.getCode()) {
+                        case CONTINUE_TRANSACTION:
+                            logger.debug("{} {} Indicates Transaction should continue", this, peer);
+                            this.dataAvailable = true;
+                            break;
+                        case FINISH_TRANSACTION:
+                            logger.debug("{} {} Indicates Transaction should finish", peer);
+                            this.dataAvailable = false;
+                            break;
+                        default:
+                            throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+                    }
                 }
-            }
-        	
-        	// if no data available, return null
-        	if ( !dataAvailable ) {
-        	    return null;
-        	}
-        	
-            logger.debug("{} Receiving data from {}", this, peer);
-            final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
-            final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
-            
-            if ( packet == null ) {
-                this.dataAvailable = false;
-            } else {
-            	transfers++;
-            }
-            
-            this.state = TransactionState.DATA_EXCHANGED;
-            return packet;
+            	
+            	// if no data available, return null
+            	if ( !dataAvailable ) {
+            	    return null;
+            	}
+            	
+                logger.debug("{} Receiving data from {}", this, peer);
+                final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
+                final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
+                
+                if ( packet == null ) {
+                    this.dataAvailable = false;
+                } else {
+                	transfers++;
+                	contentBytes += packet.getSize();
+                }
+                
+                this.state = TransactionState.DATA_EXCHANGED;
+                return packet;
+	        } catch (final IOException ioe) {
+	            throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
+	        }
 	    } catch (final Exception e) {
 	        error();
 	        throw e;
@@ -164,35 +172,40 @@ public class SocketClientTransaction implements Transaction {
 	
 	
 	@Override
-	public void send(DataPacket dataPacket) throws IOException {
+	public void send(final DataPacket dataPacket) throws IOException {
 	    try {
-    		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-    			throw new IllegalStateException("Cannot send data because Transaction State is " + state);
-    		}
-    
-            if ( direction == TransferDirection.RECEIVE ) {
-                throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
-            }
-    
-    		if ( transfers > 0 ) {
-                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-            }
-    
-            logger.debug("{} Sending data to {}", this, peer);
-    
-            final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
-    		final OutputStream out = new CheckedOutputStream(dataOut, crc);
-            codec.encode(dataPacket, out);
-            
-            // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
-            // Otherwise, do NOT close it because we don't want to close the underlying stream
-            // (CompressionOutputStream will not close the underlying stream when it's closed)
-            if ( compress ) {
-            	out.close();
-            }
-            
-            transfers++;
-            this.state = TransactionState.DATA_EXCHANGED;
+	        try {
+        		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+        			throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+        		}
+        
+                if ( direction == TransferDirection.RECEIVE ) {
+                    throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+                }
+        
+        		if ( transfers > 0 ) {
+                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+                }
+        
+                logger.debug("{} Sending data to {}", this, peer);
+        
+                final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
+        		final OutputStream out = new CheckedOutputStream(dataOut, crc);
+                codec.encode(dataPacket, out);
+                
+                // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+                // Otherwise, do NOT close it because we don't want to close the underlying stream
+                // (CompressionOutputStream will not close the underlying stream when it's closed)
+                if ( compress ) {
+                	out.close();
+                }
+                
+                transfers++;
+                contentBytes += dataPacket.getSize();
+                this.state = TransactionState.DATA_EXCHANGED;
+	        } catch (final IOException ioe) {
+	            throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
+	        }
 	    } catch (final Exception e) {
 	        error();
 	        throw e;
@@ -211,59 +224,56 @@ public class SocketClientTransaction implements Transaction {
 		    state = TransactionState.TRANSACTION_CANCELED;
 		} catch (final IOException ioe) {
 		    error();
-		    throw ioe;
+		    throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
 		}
 	}
 	
-	@Override
-	public void complete() throws IOException {
-	    complete(false);
-	}
 	
 	@Override
-	public void complete(boolean requestBackoff) throws IOException {
+	public TransactionCompletion complete() throws IOException {
 	    try {
-    		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
-    			throw new IllegalStateException("Cannot complete transaction because state is " + state + 
-    					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
-    		}
-    		
-    		if ( direction == TransferDirection.RECEIVE ) {
-    		    if ( transfers == 0 ) {
-    		        state = TransactionState.TRANSACTION_COMPLETED;
-    		        return;
-    		    }
-    		    
-                if ( requestBackoff ) {
-                    // Confirm that we received the data and the peer can now discard it but that the peer should not
-                    // send any more data for a bit
-                    logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-                    ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
-                } else {
+	        try {
+        		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+        			throw new IllegalStateException("Cannot complete transaction because state is " + state + 
+        					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+        		}
+        		
+        		boolean backoff = false;
+        		if ( direction == TransferDirection.RECEIVE ) {
+        		    if ( transfers == 0 ) {
+        		        state = TransactionState.TRANSACTION_COMPLETED;
+        		        return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
+        		    }
+        		    
                     // Confirm that we received the data and the peer can now discard it
                     logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
                     ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+                    
+                    state = TransactionState.TRANSACTION_COMPLETED;
+                } else {
+                    final Response transactionResponse;
+                    try {
+                        transactionResponse = Response.read(dis);
+                    } catch (final IOException e) {
+                        throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+                                "It is unknown whether or not the peer successfully received/processed the data.", e);
+                    }
+                    
+                    logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+                    if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                        peer.penalize(destinationId, penaltyMillis);
+                        backoff = true;
+                    } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+                        throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+                    }
+                    
+                    state = TransactionState.TRANSACTION_COMPLETED;
                 }
-                
-                state = TransactionState.TRANSACTION_COMPLETED;
-            } else {
-                final Response transactionResponse;
-                try {
-                    transactionResponse = Response.read(dis);
-                } catch (final IOException e) {
-                    throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
-                            "It is unknown whether or not the peer successfully received/processed the data.", e);
-                }
-                
-                logger.debug("{} Received {} from {}", this, transactionResponse, peer);
-                if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-                    peer.penalize(destinationId, penaltyMillis);
-                } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
-                    throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
-                }
-                
-                state = TransactionState.TRANSACTION_COMPLETED;
-            }
+        		
+        		return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
+	        } catch (final IOException ioe) {
+	            throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
+	        }
 	    } catch (final Exception e) {
 	        error();
 	        throw e;
@@ -274,81 +284,85 @@ public class SocketClientTransaction implements Transaction {
 	@Override
 	public void confirm() throws IOException {
 	    try {
-	        if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
-	            // client requested to receive data but no data available. no need to confirm.
-	            state = TransactionState.TRANSACTION_CONFIRMED;
-	            return;
-	        }
-	        
-    		if ( state != TransactionState.DATA_EXCHANGED ) {
-    			throw new IllegalStateException("Cannot confirm Transaction because state is " + state + 
-    					"; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
-    		}
-    
-            if ( direction == TransferDirection.RECEIVE ) {
-                if ( dataAvailable ) {
-                    throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
-                }
-                
-                // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
-                // to peer so that we can verify that the connection is still open. This is a two-phase commit,
-                // which helps to prevent the chances of data duplication. Without doing this, we may commit the
-                // session and then when we send the response back to the peer, the peer may have timed out and may not
-                // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
-                // Critical Section involved in this transaction so that rather than the Critical Section being the
-                // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-                logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
-                final String calculatedCRC = String.valueOf(crc.getValue());
-                ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-                
-                final Response confirmTransactionResponse;
-                try {
-                    confirmTransactionResponse = Response.read(dis);
-                } catch (final IOException ioe) {
-                    logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
-                    throw ioe;
-                }
-                
-                logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-                
-                switch (confirmTransactionResponse.getCode()) {
-                    case CONFIRM_TRANSACTION:
-                        break;
-                    case BAD_CHECKSUM:
-                        throw new IOException(this + " Received a BadChecksum response from peer " + peer);
-                    default:
-                        throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
-                }
-                
-                state = TransactionState.TRANSACTION_CONFIRMED;
-            } else {
-                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
-                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                
-                final String calculatedCRC = String.valueOf(crc.getValue());
-                
-                // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
-                final Response transactionConfirmationResponse = Response.read(dis);
-                if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
-                    // Confirm checksum and echo back the confirmation.
-                    logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
-                    final String receivedCRC = transactionConfirmationResponse.getMessage();
+	        try {
+    	        if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
+    	            // client requested to receive data but no data available. no need to confirm.
+    	            state = TransactionState.TRANSACTION_CONFIRMED;
+    	            return;
+    	        }
+    	        
+        		if ( state != TransactionState.DATA_EXCHANGED ) {
+        			throw new IllegalStateException("Cannot confirm Transaction because state is " + state + 
+        					"; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+        		}
+        
+                if ( direction == TransferDirection.RECEIVE ) {
+                    if ( dataAvailable ) {
+                        throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+                    }
                     
-                    // CRC was not used before version 4
-                    if ( protocolVersion > 3 ) {
-                        if ( !receivedCRC.equals(calculatedCRC) ) {
-                            ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                            throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
-                        }
+                    // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+                    // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+                    // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+                    // session and then when we send the response back to the peer, the peer may have timed out and may not
+                    // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+                    // Critical Section involved in this transaction so that rather than the Critical Section being the
+                    // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+                    logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+                    final String calculatedCRC = String.valueOf(crc.getValue());
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+                    
+                    final Response confirmTransactionResponse;
+                    try {
+                        confirmTransactionResponse = Response.read(dis);
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
+                        throw ioe;
                     }
                     
-                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+                    logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+                    
+                    switch (confirmTransactionResponse.getCode()) {
+                        case CONFIRM_TRANSACTION:
+                            break;
+                        case BAD_CHECKSUM:
+                            throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+                        default:
+                            throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+                    }
+                    
+                    state = TransactionState.TRANSACTION_CONFIRMED;
                 } else {
-                    throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+                    logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+                    ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+                    
+                    final String calculatedCRC = String.valueOf(crc.getValue());
+                    
+                    // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+                    final Response transactionConfirmationResponse = Response.read(dis);
+                    if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+                        // Confirm checksum and echo back the confirmation.
+                        logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+                        final String receivedCRC = transactionConfirmationResponse.getMessage();
+                        
+                        // CRC was not used before version 4
+                        if ( protocolVersion > 3 ) {
+                            if ( !receivedCRC.equals(calculatedCRC) ) {
+                                ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                            }
+                        }
+                        
+                        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+                    } else {
+                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+                    }
+                    
+                    state = TransactionState.TRANSACTION_CONFIRMED;
                 }
-                
-                state = TransactionState.TRANSACTION_CONFIRMED;
-            }
+	        } catch (final IOException ioe) {
+	            throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
+	        }
 	    } catch (final Exception e) {
 	        error();
 	        throw e;
@@ -365,4 +379,13 @@ public class SocketClientTransaction implements Transaction {
 		return state;
 	}
 
+	@Override
+	public Communicant getCommunicant() {
+	    return peer;
+	}
+	
+    @Override
+    public String toString() {
+        return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
new file mode 100644
index 0000000..5eb6c91
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.TransactionCompletion;
+
+public class SocketClientTransactionCompletion implements TransactionCompletion {
+
+    private final boolean backoff;
+    private final int dataPacketsTransferred;
+    private final long bytesTransferred;
+    private final long durationNanos;
+    
+    public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
+        this.backoff = backoff;
+        this.dataPacketsTransferred = dataPacketsTransferred;
+        this.bytesTransferred = bytesTransferred;
+        this.durationNanos = durationNanos;
+    }
+
+    @Override
+    public boolean isBackoff() {
+        return backoff;
+    }
+
+    @Override
+    public int getDataPacketsTransferred() {
+        return dataPacketsTransferred;
+    }
+
+    @Override
+    public long getBytesTransferred() {
+        return bytesTransferred;
+    }
+
+    @Override
+    public long getDuration(final TimeUnit timeUnit) {
+        return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index a744905..2fd90f8 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
@@ -35,13 +36,13 @@ import org.junit.Test;
 public class TestSiteToSiteClient {
 
     @Test
-    @Ignore("For local testing only; not really a unit test but a manual test")
+    //@Ignore("For local testing only; not really a unit test but a manual test")
     public void testReceive() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
         
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
             .url("http://localhost:8080/nifi")
-            .portName("out")
+            .portName("cba")
             .requestBatchCount(1)
             .build();
         
@@ -62,7 +63,7 @@ public class TestSiteToSiteClient {
             Assert.assertNull(transaction.receive());
             
             transaction.confirm();
-            transaction.complete(false);
+            transaction.complete();
         } finally {
             client.close();
         }
@@ -70,13 +71,14 @@ public class TestSiteToSiteClient {
     
     
     @Test
-    @Ignore("For local testing only; not really a unit test but a manual test")
+    //@Ignore("For local testing only; not really a unit test but a manual test")
     public void testSend() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
         
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url("http://localhost:8080/nifi")
-            .portName("in")
+            .url("http://10.0.64.63:8080/nifi")
+            .portName("input")
+            .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
             .build();
         
         try {
@@ -91,7 +93,7 @@ public class TestSiteToSiteClient {
             transaction.send(packet);
 
             transaction.confirm();
-            transaction.complete(false);
+            transaction.complete();
         } finally {
             client.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 3295956..8a4839b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -22,6 +22,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -130,7 +131,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                     final Thread thread = new Thread(new Runnable() {
                         @Override
                         public void run() {
-                            String hostname = socket.getInetAddress().getHostName();
+                            LOG.debug("{} Determining URL of connection", this);
+                            final InetAddress inetAddress = socket.getInetAddress();
+                            String hostname = inetAddress.getHostName();
                             final int slashIndex = hostname.indexOf("/");
                             if ( slashIndex == 0 ) {
                                 hostname = hostname.substring(1);
@@ -140,6 +143,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
 
                             final int port = socket.getPort();
                             final String peerUri = "nifi://" + hostname + ":" + port;
+                            LOG.debug("{} Connection URL is {}", this, peerUri);
                             
                             final CommunicationsSession commsSession;
                             final String dn;
@@ -154,6 +158,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                                     dn = sslSocketChannel.getDn();
                                     commsSession.setUserDn(dn);
                                 } else {
+                                    LOG.trace("{} Channel is not secure", this);
                                     commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri);
                                     dn = null;
                                 }
@@ -306,6 +311,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                         }
                     });
                     thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
+                    LOG.debug("Handing connection to {}", thread);
                     thread.start();
                 }
             }