You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 05:39:48 UTC

[4/7] incubator-nifi git commit: NIFI-271 checkpoint

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index 3f0ec4f..e7b6d06 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -19,27 +19,30 @@ package org.apache.nifi.remote.protocol;
 import java.io.InputStream;
 import java.util.Map;
 
-
 /**
- * Represents a piece of data that is to be sent to or that was received from a NiFi instance.
+ * Represents a piece of data that is to be sent to or that was received from a
+ * NiFi instance.
  */
 public interface DataPacket {
 
     /**
      * The key-value attributes that are to be associated with the data
-     * @return
+     *
+     * @return all attributes
+     */
+    Map<String, String> getAttributes();
+
+    /**
+     * An InputStream from which the content can be read
+     *
+     * @return input stream to the data
      */
-	Map<String, String> getAttributes();
-	
-	/**
-	 * An InputStream from which the content can be read
-	 * @return
-	 */
-	InputStream getData();
+    InputStream getData();
 
-	/**
-	 * The length of the InputStream.
-	 * @return
-	 */
-	long getSize();
+    /**
+     * The length of the InputStream.
+     *
+     * @return length of the inputstream.
+     */
+    long getSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
index 41dc276..016690c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -16,46 +16,44 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
-
 /**
- * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ * Enumeration of Properties that can be used for the Site-to-Site Socket
+ * Protocol.
  */
 public enum HandshakeProperty {
+
     /**
-     * Boolean value indicating whether or not the contents of a FlowFile should be
-     * GZipped when transferred.
+     * Boolean value indicating whether or not the contents of a FlowFile should
+     * be GZipped when transferred.
      */
     GZIP,
-    
     /**
      * The unique identifier of the port to communicate with
      */
     PORT_IDENTIFIER,
-    
     /**
-     * Indicates the number of milliseconds after the request was made that the client
-     * will wait for a response. If no response has been received by the time this value
-     * expires, the server can move on without attempting to service the request because
-     * the client will have already disconnected.
+     * Indicates the number of milliseconds after the request was made that the
+     * client will wait for a response. If no response has been received by the
+     * time this value expires, the server can move on without attempting to
+     * service the request because the client will have already disconnected.
      */
     REQUEST_EXPIRATION_MILLIS,
-    
     /**
-     * The preferred number of FlowFiles that the server should send to the client
-     * when pulling data. This property was introduced in version 5 of the protocol.
+     * The preferred number of FlowFiles that the server should send to the
+     * client when pulling data. This property was introduced in version 5 of
+     * the protocol.
      */
     BATCH_COUNT,
-    
     /**
-     * The preferred number of bytes that the server should send to the client when
-     * pulling data. This property was introduced in version 5 of the protocol.
+     * The preferred number of bytes that the server should send to the client
+     * when pulling data. This property was introduced in version 5 of the
+     * protocol.
      */
     BATCH_SIZE,
-    
     /**
-     * The preferred amount of time that the server should send data to the client
-     * when pulling data. This property was introduced in version 5 of the protocol.
-     * Value is in milliseconds.
+     * The preferred amount of time that the server should send data to the
+     * client when pulling data. This property was introduced in version 5 of
+     * the protocol. Value is in milliseconds.
      */
     BATCH_DURATION;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
index eae1940..6ad2ba0 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -22,28 +22,29 @@ import java.io.IOException;
 import org.apache.nifi.remote.exception.ProtocolException;
 
 public class Response {
+
     private final ResponseCode code;
     private final String message;
-    
+
     private Response(final ResponseCode code, final String explanation) {
         this.code = code;
         this.message = explanation;
     }
-    
+
     public ResponseCode getCode() {
         return code;
     }
-    
+
     public String getMessage() {
         return message;
     }
-    
+
     public static Response read(final DataInputStream in) throws IOException, ProtocolException {
         final ResponseCode code = ResponseCode.readCode(in);
         final String message = code.containsMessage() ? in.readUTF() : null;
         return new Response(code, message);
     }
-    
+
     @Override
     public String toString() {
         return code + ": " + message;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
index 8860e73..0e1359e 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -23,131 +23,126 @@ import java.io.InputStream;
 
 import org.apache.nifi.remote.exception.ProtocolException;
 
-
 public enum ResponseCode {
+
     RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of
-                                            // ResponseCode, so that we can indicate a 0 followed by some other bytes
-    
+    // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
     // handshaking properties
     PROPERTIES_OK(1, "Properties OK", false),
     UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
     ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
     MISSING_PROPERTY(232, "Missing Property", true),
-    
     // transaction indicators
     CONTINUE_TRANSACTION(10, "Continue Transaction", false),
     FINISH_TRANSACTION(11, "Finish Transaction", false),
-    CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" of this code is the checksum
+    CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
     TRANSACTION_FINISHED(13, "Transaction Finished", false),
     TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
     CANCEL_TRANSACTION(15, "Cancel Transaction", true),
     BAD_CHECKSUM(19, "Bad Checksum", false),
-
     // data availability indicators
     MORE_DATA(20, "More Data Exists", false),
     NO_MORE_DATA(21, "No More Data Exists", false),
-    
     // port state indicators
     UNKNOWN_PORT(200, "Unknown Port", false),
     PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
     PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
-    
     // authorization
     UNAUTHORIZED(240, "User Not Authorized", true),
-    
     // error indicators
     ABORT(250, "Abort", true),
     UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
     END_OF_STREAM(255, "End of Stream", false);
-    
+
     private static final ResponseCode[] codeArray = new ResponseCode[256];
-    
+
     static {
-        for ( final ResponseCode responseCode : ResponseCode.values() ) {
+        for (final ResponseCode responseCode : ResponseCode.values()) {
             codeArray[responseCode.getCode()] = responseCode;
         }
     }
-    
+
     private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
     private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
     private final int code;
     private final byte[] codeSequence;
     private final String description;
     private final boolean containsMessage;
-    
+
     private ResponseCode(final int code, final String description, final boolean containsMessage) {
-        this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
+        this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
         this.code = code;
         this.description = description;
         this.containsMessage = containsMessage;
     }
-    
+
     public int getCode() {
         return code;
     }
-    
+
     public byte[] getCodeSequence() {
         return codeSequence;
     }
-    
+
     @Override
     public String toString() {
         return description;
     }
-    
+
     public boolean containsMessage() {
         return containsMessage;
     }
-    
+
     public void writeResponse(final DataOutputStream out) throws IOException {
-        if ( containsMessage() ) {
+        if (containsMessage()) {
             throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
         }
-        
+
         out.write(getCodeSequence());
         out.flush();
     }
-    
+
     public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
-        if ( !containsMessage() ) {
+        if (!containsMessage()) {
             throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
         }
-        
+
         out.write(getCodeSequence());
         out.writeUTF(explanation);
         out.flush();
     }
-    
+
     static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
         final int byte1 = in.read();
-        if ( byte1 < 0 ) {
+        if (byte1 < 0) {
             throw new EOFException();
-        } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+        } else if (byte1 != CODE_SEQUENCE_VALUE_1) {
             throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
         }
-        
+
         final int byte2 = in.read();
-        if ( byte2 < 0 ) {
+        if (byte2 < 0) {
             throw new EOFException();
-        } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+        } else if (byte2 != CODE_SEQUENCE_VALUE_2) {
             throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
         }
 
         final int byte3 = in.read();
-        if ( byte3 < 0 ) {
+        if (byte3 < 0) {
             throw new EOFException();
         }
-        
+
         final ResponseCode responseCode = codeArray[byte3];
         if (responseCode == null) {
             throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code");
         }
         return responseCode;
     }
-    
+
     public static ResponseCode fromSequence(final byte[] value) {
         final int code = value[3] & 0xFF;
         final ResponseCode responseCode = codeArray[code];
         return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 83c5305..de845ee 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -58,120 +58,121 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketClientProtocol implements ClientProtocol {
+
     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
 
     private RemoteDestination destination;
     private boolean useCompression = false;
-    
+
     private String commsIdentifier;
     private boolean handshakeComplete = false;
-    
+
     private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
-    
+
     private Response handshakeResponse = null;
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
-    
+
     private int batchCount;
     private long batchSize;
     private long batchMillis;
     private EventReporter eventReporter;
 
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
-    
+
     public SocketClientProtocol() {
     }
 
     public void setPreferredBatchCount(final int count) {
         this.batchCount = count;
     }
-    
+
     public void setPreferredBatchSize(final long bytes) {
         this.batchSize = bytes;
     }
-    
+
     public void setPreferredBatchDuration(final long millis) {
         this.batchMillis = millis;
     }
-    
+
     public void setEventReporter(final EventReporter eventReporter) {
-    	this.eventReporter = eventReporter;
+        this.eventReporter = eventReporter;
     }
-    
+
     public void setDestination(final RemoteDestination destination) {
         this.destination = destination;
         this.useCompression = destination.isUseCompression();
     }
-    
+
     public void setTimeout(final int timeoutMillis) {
-    	this.timeoutMillis = timeoutMillis;
+        this.timeoutMillis = timeoutMillis;
     }
-    
+
     @Override
     public void handshake(final Peer peer) throws IOException, HandshakeException {
-    	handshake(peer, destination.getIdentifier());
+        handshake(peer, destination.getIdentifier());
     }
-    
+
     public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
-        if ( handshakeComplete ) {
+        if (handshakeComplete) {
             throw new IllegalStateException("Handshake has already been completed");
         }
         commsIdentifier = UUID.randomUUID().toString();
         logger.debug("{} handshaking with {}", this, peer);
-        
+
         final Map<HandshakeProperty, String> properties = new HashMap<>();
         properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
-        
-        if ( destinationId != null ) {
-        	properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
+
+        if (destinationId != null) {
+            properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
         }
-        
-        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
-        
-        if ( versionNegotiator.getVersion() >= 5 ) {
-            if ( batchCount > 0 ) {
+
+        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis));
+
+        if (versionNegotiator.getVersion() >= 5) {
+            if (batchCount > 0) {
                 properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
             }
-            if ( batchSize > 0L ) {
+            if (batchSize > 0L) {
                 properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
             }
-            if ( batchMillis > 0L ) {
+            if (batchMillis > 0L) {
                 properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
             }
         }
-        
+
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         commsSession.setTimeout(timeoutMillis);
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
+
         dos.writeUTF(commsIdentifier);
-        
-        if ( versionNegotiator.getVersion() >= 3 ) {
+
+        if (versionNegotiator.getVersion() >= 3) {
             dos.writeUTF(peer.getUrl());
             transitUriPrefix = peer.getUrl();
-            
-            if ( !transitUriPrefix.endsWith("/") ) {
+
+            if (!transitUriPrefix.endsWith("/")) {
                 transitUriPrefix = transitUriPrefix + "/";
             }
         }
-        
+
         logger.debug("Handshaking with properties {}", properties);
         dos.writeInt(properties.size());
-        for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+        for (final Map.Entry<HandshakeProperty, String> entry : properties.entrySet()) {
             dos.writeUTF(entry.getKey().name());
             dos.writeUTF(entry.getValue());
         }
-        
+
         dos.flush();
-        
+
         try {
             handshakeResponse = Response.read(dis);
         } catch (final ProtocolException e) {
             throw new HandshakeException(e);
         }
-        
+
         switch (handshakeResponse.getCode()) {
             case PORT_NOT_IN_VALID_STATE:
             case UNKNOWN_PORT:
@@ -181,71 +182,75 @@ public class SocketClientProtocol implements ClientProtocol {
                 readyForFileTransfer = true;
                 break;
             default:
-                logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+                logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[]{
                     this, handshakeResponse, peer});
                 peer.close();
                 throw new HandshakeException("Received unexpected response " + handshakeResponse);
         }
-        
+
         logger.debug("{} Finished handshake with {}", this, peer);
         handshakeComplete = true;
     }
-    
+
+    @Override
     public boolean isReadyForFileTransfer() {
         return readyForFileTransfer;
     }
-    
+
+    @Override
     public boolean isPortInvalid() {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not completed successfully");
         }
         return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
     }
-    
+
+    @Override
     public boolean isPortUnknown() {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not completed successfully");
         }
         return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
     }
-    
+
+    @Override
     public boolean isDestinationFull() {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not completed successfully");
         }
         return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
     }
-    
+
     @Override
     public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not been performed");
         }
-        
+
         logger.debug("{} Get Peer Statuses from {}", this, peer);
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
+
         RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
         dos.flush();
         final int numPeers = dis.readInt();
         final Set<PeerStatus> peers = new HashSet<>(numPeers);
-        for (int i=0; i < numPeers; i++) {
+        for (int i = 0; i < numPeers; i++) {
             final String hostname = dis.readUTF();
             final int port = dis.readInt();
             final boolean secure = dis.readBoolean();
             final int flowFileCount = dis.readInt();
             peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
         }
-        
+
         logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
         return peers;
     }
-    
+
     @Override
     public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not been performed");
         }
 
@@ -255,177 +260,174 @@ public class SocketClientProtocol implements ClientProtocol {
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
 
         RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
-        
+
         FlowFileCodec codec = new StandardFlowFileCodec();
         try {
             codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
         } catch (HandshakeException e) {
             throw new ProtocolException(e.toString());
         }
-        logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+        logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[]{this, codec, commsSession});
 
         return codec;
     }
 
-
     @Override
     public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
-        if ( !handshakeComplete ) {
+        if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not been performed");
         }
-        if ( !readyForFileTransfer ) {
+        if (!readyForFileTransfer) {
             throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
         }
-        
-        return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, 
-        		direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
-    }
 
+        return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
+                direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
+    }
 
     @Override
     public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-    	final String userDn = peer.getCommunicationsSession().getUserDn();
-    	final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
-    	
-    	final StopWatch stopWatch = new StopWatch(true);
-    	final Set<FlowFile> flowFilesReceived = new HashSet<>();
-    	long bytesReceived = 0L;
-    	
-    	while (true) {
-    		final long start = System.nanoTime();
-    		final DataPacket dataPacket = transaction.receive();
-    		if ( dataPacket == null ) {
-    		    if ( flowFilesReceived.isEmpty() ) {
-    		        peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-    		    }
-    			break;
-    		}
-    		
-    		FlowFile flowFile = session.create();
-    		flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
-    		flowFile = session.importFrom(dataPacket.getData(), flowFile);
-    		final long receiveNanos = System.nanoTime() - start;
-    		
-			String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-			if ( sourceFlowFileIdentifier == null ) {
-				sourceFlowFileIdentifier = "<Unknown Identifier>";
-			}
-			
-			final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
-			session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
-
-    		session.transfer(flowFile, Relationship.ANONYMOUS);
-    		bytesReceived += dataPacket.getSize();
-    	}
-
-    	// Confirm that what we received was the correct data.
-    	transaction.confirm();
-    	
-		// Commit the session so that we have persisted the data
-		session.commit();
-
-		transaction.complete();
-		logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-
-		if ( !flowFilesReceived.isEmpty() ) {
-    		stopWatch.stop();
-    		final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-    		final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
-    		final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-    		final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-    		logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { 
-    				this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
-		}
-		
-		return flowFilesReceived.size();
+        final String userDn = peer.getCommunicationsSession().getUserDn();
+        final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+
+        final StopWatch stopWatch = new StopWatch(true);
+        final Set<FlowFile> flowFilesReceived = new HashSet<>();
+        long bytesReceived = 0L;
+
+        while (true) {
+            final long start = System.nanoTime();
+            final DataPacket dataPacket = transaction.receive();
+            if (dataPacket == null) {
+                if (flowFilesReceived.isEmpty()) {
+                    peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+                }
+                break;
+            }
+
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            final long receiveNanos = System.nanoTime() - start;
+
+            String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+            if (sourceFlowFileIdentifier == null) {
+                sourceFlowFileIdentifier = "<Unknown Identifier>";
+            }
+
+            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host="
+                    + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+            session.transfer(flowFile, Relationship.ANONYMOUS);
+            bytesReceived += dataPacket.getSize();
+        }
+
+        // Confirm that what we received was the correct data.
+        transaction.confirm();
+
+        // Commit the session so that we have persisted the data
+        session.commit();
+
+        transaction.complete();
+        logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+
+        if (!flowFilesReceived.isEmpty()) {
+            stopWatch.stop();
+            final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+            final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+            final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+            final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+                this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+        }
+
+        return flowFilesReceived.size();
     }
 
-    
     @Override
     public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-		FlowFile flowFile = session.get();
-		if (flowFile == null) {
-			return 0;
-		}
-
-		try {
-			final String userDn = peer.getCommunicationsSession().getUserDn();
-			final long startSendingNanos = System.nanoTime();
-			final StopWatch stopWatch = new StopWatch(true);
-			long bytesSent = 0L;
-			
-			final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
-			
-			final Set<FlowFile> flowFilesSent = new HashSet<>();
-	        boolean continueTransaction = true;
-	        while (continueTransaction) {
-	        	final long startNanos = System.nanoTime();
-	            // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
-	            final FlowFile toWrap = flowFile;
-	            session.read(flowFile, new InputStreamCallback() {
-	                @Override
-	                public void process(final InputStream in) throws IOException {
-	                    final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
-	                    transaction.send(dataPacket);
-	                }
-	            });
-	            
-	            final long transferNanos = System.nanoTime() - startNanos;
-	            final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-	            
-	            flowFilesSent.add(flowFile);
-	            bytesSent += flowFile.getSize();
-	            logger.debug("{} Sent {} to {}", this, flowFile, peer);
-	            
-	            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
-	            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
-	            session.remove(flowFile);
-	            
-	            final long sendingNanos = System.nanoTime() - startSendingNanos;
-	            if ( sendingNanos < BATCH_SEND_NANOS ) { 
-	                flowFile = session.get();
-	            } else {
-	                flowFile = null;
-	            }
-	            
-	            continueTransaction = (flowFile != null);
-	        }
-	        
-	        transaction.confirm();
-	        
-	        // consume input stream entirely, ignoring its contents. If we
-	        // don't do this, the Connection will not be returned to the pool
-	        stopWatch.stop();
-	        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
-	        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-	        final String dataSize = FormatUtils.formatDataSize(bytesSent);
-	        
-	        session.commit();
-	        transaction.complete();
-	        
-	        final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-	        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
-	            this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
-	        
-	        return flowFilesSent.size();
-		} catch (final Exception e) {
-			session.rollback();
-			throw e;
-		}
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return 0;
+        }
+
+        try {
+            final String userDn = peer.getCommunicationsSession().getUserDn();
+            final long startSendingNanos = System.nanoTime();
+            final StopWatch stopWatch = new StopWatch(true);
+            long bytesSent = 0L;
+
+            final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+
+            final Set<FlowFile> flowFilesSent = new HashSet<>();
+            boolean continueTransaction = true;
+            while (continueTransaction) {
+                final long startNanos = System.nanoTime();
+                // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+                final FlowFile toWrap = flowFile;
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream in) throws IOException {
+                        final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+                        transaction.send(dataPacket);
+                    }
+                });
+
+                final long transferNanos = System.nanoTime() - startNanos;
+                final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+                flowFilesSent.add(flowFile);
+                bytesSent += flowFile.getSize();
+                logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+                final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+                session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+                session.remove(flowFile);
+
+                final long sendingNanos = System.nanoTime() - startSendingNanos;
+                if (sendingNanos < BATCH_SEND_NANOS) {
+                    flowFile = session.get();
+                } else {
+                    flowFile = null;
+                }
+
+                continueTransaction = (flowFile != null);
+            }
+
+            transaction.confirm();
+
+            // consume input stream entirely, ignoring its contents. If we
+            // don't do this, the Connection will not be returned to the pool
+            stopWatch.stop();
+            final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+            final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+            final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+            session.commit();
+            transaction.complete();
+
+            final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
+                this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+
+            return flowFilesSent.size();
+        } catch (final Exception e) {
+            session.rollback();
+            throw e;
+        }
     }
-    
-    
+
     @Override
     public VersionNegotiator getVersionNegotiator() {
         return versionNegotiator;
     }
-    
+
     @Override
     public void shutdown(final Peer peer) throws IOException {
         readyForFileTransfer = false;
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
+
         logger.debug("{} Shutting down with {}", this, peer);
         // Indicate that we would like to have some data
         RequestType.SHUTDOWN.writeRequestType(dos);
@@ -436,7 +438,7 @@ public class SocketClientProtocol implements ClientProtocol {
     public String getResourceName() {
         return "SocketFlowFileProtocol";
     }
-    
+
     @Override
     public String toString() {
         return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index e69104f..e83ea28 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -45,50 +45,51 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketClientTransaction implements Transaction {
-	private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
-	
-	private final long creationNanoTime = System.nanoTime();
-	private final CRC32 crc = new CRC32();
-	private final int protocolVersion;
-	private final FlowFileCodec codec;
-	private final DataInputStream dis;
-	private final DataOutputStream dos;
-	private final TransferDirection direction;
-	private final boolean compress;
-	private final Peer peer;
-	private final int penaltyMillis;
-	private final String destinationId;
-	private final EventReporter eventReporter;
-	
-	private boolean dataAvailable = false;
-	private int transfers = 0;
-	private long contentBytes = 0;
-	private TransactionState state;
-	
-	SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, 
-			final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
-		this.protocolVersion = protocolVersion;
-		this.destinationId = destinationId;
-		this.peer = peer;
-		this.codec = codec;
-		this.direction = direction;
-		this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
-		this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
-		this.compress = useCompression;
-		this.state = TransactionState.TRANSACTION_STARTED;
-		this.penaltyMillis = penaltyMillis;
-		this.eventReporter = eventReporter;
-		
-		initialize();
-	}
-	
-	private void initialize() throws IOException {
-	    try {
-            if ( direction == TransferDirection.RECEIVE ) {
+
+    private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
+
+    private final long creationNanoTime = System.nanoTime();
+    private final CRC32 crc = new CRC32();
+    private final int protocolVersion;
+    private final FlowFileCodec codec;
+    private final DataInputStream dis;
+    private final DataOutputStream dos;
+    private final TransferDirection direction;
+    private final boolean compress;
+    private final Peer peer;
+    private final int penaltyMillis;
+    private final String destinationId;
+    private final EventReporter eventReporter;
+
+    private boolean dataAvailable = false;
+    private int transfers = 0;
+    private long contentBytes = 0;
+    private TransactionState state;
+
+    SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
+            final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
+        this.protocolVersion = protocolVersion;
+        this.destinationId = destinationId;
+        this.peer = peer;
+        this.codec = codec;
+        this.direction = direction;
+        this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+        this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+        this.compress = useCompression;
+        this.state = TransactionState.TRANSACTION_STARTED;
+        this.penaltyMillis = penaltyMillis;
+        this.eventReporter = eventReporter;
+
+        initialize();
+    }
+
+    private void initialize() throws IOException {
+        try {
+            if (direction == TransferDirection.RECEIVE) {
                 // Indicate that we would like to have some data
                 RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
                 dos.flush();
-                
+
                 final Response dataAvailableCode = Response.read(dis);
                 switch (dataAvailableCode.getCode()) {
                     case MORE_DATA:
@@ -102,39 +103,38 @@ public class SocketClientTransaction implements Transaction {
                     default:
                         throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
                 }
-    
+
             } else {
                 // Indicate that we would like to have some data
                 RequestType.SEND_FLOWFILES.writeRequestType(dos);
                 dos.flush();
             }
-	    } catch (final Exception e) {
-	        error();
-	        throw e;
-	    }
-	}
-	
-	
-	@Override
-	public DataPacket receive() throws IOException {
-	    try {
-	        try {
-        		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-        			throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
-        		}
-        		
-            	if ( direction == TransferDirection.SEND ) {
-            	    throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
-            	}
-            	
-            	// if we already know there's no data, just return null
-            	if ( !dataAvailable ) {
-            	    return null;
-            	}
-        
-            	// if we have already received a packet, check if another is available.
-            	if ( transfers > 0 ) {
-            	    // Determine if Peer will send us data or has no data to send us
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public DataPacket receive() throws IOException {
+        try {
+            try {
+                if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+                    throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
+                }
+
+                if (direction == TransferDirection.SEND) {
+                    throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
+                }
+
+                // if we already know there's no data, just return null
+                if (!dataAvailable) {
+                    return null;
+                }
+
+                // if we have already received a packet, check if another is available.
+                if (transfers > 0) {
+                    // Determine if Peer will send us data or has no data to send us
                     final Response dataAvailableCode = Response.read(dis);
                     switch (dataAvailableCode.getCode()) {
                         case CONTINUE_TRANSACTION:
@@ -149,170 +149,166 @@ public class SocketClientTransaction implements Transaction {
                             throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode);
                     }
                 }
-            	
-            	// if no data available, return null
-            	if ( !dataAvailable ) {
-            	    return null;
-            	}
-            	
+
+                // if no data available, return null
+                if (!dataAvailable) {
+                    return null;
+                }
+
                 logger.debug("{} Receiving data from {}", this, peer);
                 final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
                 final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
-                
-                if ( packet == null ) {
+
+                if (packet == null) {
                     this.dataAvailable = false;
                 } else {
-                	transfers++;
-                	contentBytes += packet.getSize();
+                    transfers++;
+                    contentBytes += packet.getSize();
                 }
-                
+
                 this.state = TransactionState.DATA_EXCHANGED;
                 return packet;
-	        } catch (final IOException ioe) {
-	            throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
-	        }
-	    } catch (final Exception e) {
-	        error();
-	        throw e;
-	    }
-	}
-	
-	
-	@Override
-	public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
-	    send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
-	}
-	
-	@Override
-	public void send(final DataPacket dataPacket) throws IOException {
-	    try {
-	        try {
-        		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
-        			throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
-        		}
-        
-                if ( direction == TransferDirection.RECEIVE ) {
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+        send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+    }
+
+    @Override
+    public void send(final DataPacket dataPacket) throws IOException {
+        try {
+            try {
+                if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+                    throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
+                }
+
+                if (direction == TransferDirection.RECEIVE) {
                     throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction");
                 }
-        
-        		if ( transfers > 0 ) {
+
+                if (transfers > 0) {
                     ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
                 }
-        
+
                 logger.debug("{} Sending data to {}", this, peer);
-        
+
                 final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
-        		final OutputStream out = new CheckedOutputStream(dataOut, crc);
+                final OutputStream out = new CheckedOutputStream(dataOut, crc);
                 codec.encode(dataPacket, out);
-                
+
                 // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
                 // Otherwise, do NOT close it because we don't want to close the underlying stream
                 // (CompressionOutputStream will not close the underlying stream when it's closed)
-                if ( compress ) {
-                	out.close();
+                if (compress) {
+                    out.close();
                 }
-                
+
                 transfers++;
                 contentBytes += dataPacket.getSize();
                 this.state = TransactionState.DATA_EXCHANGED;
-	        } catch (final IOException ioe) {
-	            throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
-	        }
-	    } catch (final Exception e) {
-	        error();
-	        throw e;
-	    }
-	}
-	
-	
-	@Override
-	public void cancel(final String explanation) throws IOException {
-		if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
-			throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
-		}
-
-		try {
-		    ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
-		    state = TransactionState.TRANSACTION_CANCELED;
-		} catch (final IOException ioe) {
-		    error();
-		    throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
-		}
-	}
-	
-	
-	@Override
-	public TransactionCompletion complete() throws IOException {
-	    try {
-	        try {
-        		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
-        			throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + 
-        					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
-        		}
-        		
-        		boolean backoff = false;
-        		if ( direction == TransferDirection.RECEIVE ) {
-        		    if ( transfers == 0 ) {
-        		        state = TransactionState.TRANSACTION_COMPLETED;
-        		        return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
-        		    }
-        		    
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public void cancel(final String explanation) throws IOException {
+        if (state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) {
+            throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+        }
+
+        try {
+            ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+            state = TransactionState.TRANSACTION_CANCELED;
+        } catch (final IOException ioe) {
+            error();
+            throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
+        }
+    }
+
+    @Override
+    public TransactionCompletion complete() throws IOException {
+        try {
+            try {
+                if (state != TransactionState.TRANSACTION_CONFIRMED) {
+                    throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state
+                            + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+                }
+
+                boolean backoff = false;
+                if (direction == TransferDirection.RECEIVE) {
+                    if (transfers == 0) {
+                        state = TransactionState.TRANSACTION_COMPLETED;
+                        return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
+                    }
+
                     // Confirm that we received the data and the peer can now discard it
                     logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
                     ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-                    
+
                     state = TransactionState.TRANSACTION_COMPLETED;
                 } else {
                     final Response transactionResponse;
                     try {
                         transactionResponse = Response.read(dis);
                     } catch (final IOException e) {
-                        throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
-                                "It is unknown whether or not the peer successfully received/processed the data.", e);
+                        throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. "
+                                + "It is unknown whether or not the peer successfully received/processed the data.", e);
                     }
-                    
+
                     logger.debug("{} Received {} from {}", this, transactionResponse, peer);
-                    if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                    if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                         peer.penalize(destinationId, penaltyMillis);
                         backoff = true;
-                    } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+                    } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                         throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse);
                     }
-                    
+
                     state = TransactionState.TRANSACTION_COMPLETED;
                 }
-        		
-        		return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
-	        } catch (final IOException ioe) {
-	            throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
-	        }
-	    } catch (final Exception e) {
-	        error();
-	        throw e;
-	    }
-	}
-	
-	
-	@Override
-	public void confirm() throws IOException {
-	    try {
-	        try {
-    	        if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
-    	            // client requested to receive data but no data available. no need to confirm.
-    	            state = TransactionState.TRANSACTION_CONFIRMED;
-    	            return;
-    	        }
-    	        
-        		if ( state != TransactionState.DATA_EXCHANGED ) {
-        			throw new IllegalStateException("Cannot confirm Transaction because state is " + state + 
-        					"; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
-        		}
-        
-                if ( direction == TransferDirection.RECEIVE ) {
-                    if ( dataAvailable ) {
+
+                return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public void confirm() throws IOException {
+        try {
+            try {
+                if (state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE) {
+                    // client requested to receive data but no data available. no need to confirm.
+                    state = TransactionState.TRANSACTION_CONFIRMED;
+                    return;
+                }
+
+                if (state != TransactionState.DATA_EXCHANGED) {
+                    throw new IllegalStateException("Cannot confirm Transaction because state is " + state
+                            + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED);
+                }
+
+                if (direction == TransferDirection.RECEIVE) {
+                    if (dataAvailable) {
                         throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
                     }
-                    
+
                     // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
                     // to peer so that we can verify that the connection is still open. This is a two-phase commit,
                     // which helps to prevent the chances of data duplication. Without doing this, we may commit the
@@ -323,84 +319,88 @@ public class SocketClientTransaction implements Transaction {
                     logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
                     final String calculatedCRC = String.valueOf(crc.getValue());
                     ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-                    
+
                     final Response confirmTransactionResponse;
                     try {
                         confirmTransactionResponse = Response.read(dis);
                     } catch (final IOException ioe) {
                         logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer);
-                        if ( eventReporter != null ) {
-                        	eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
+                        if (eventReporter != null) {
+                            eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
                         }
                         throw ioe;
                     }
-                    
+
                     logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-                    
+
                     switch (confirmTransactionResponse.getCode()) {
                         case CONFIRM_TRANSACTION:
                             break;
                         case BAD_CHECKSUM:
                             throw new IOException(this + " Received a BadChecksum response from peer " + peer);
                         default:
-                            throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+                            throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : "
+                                    + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
                     }
-                    
+
                     state = TransactionState.TRANSACTION_CONFIRMED;
                 } else {
                     logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
                     ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                    
+
                     final String calculatedCRC = String.valueOf(crc.getValue());
-                    
+
                     // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
                     final Response transactionConfirmationResponse = Response.read(dis);
-                    if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+                    if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
                         // Confirm checksum and echo back the confirmation.
                         logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
                         final String receivedCRC = transactionConfirmationResponse.getMessage();
-                        
+
                         // CRC was not used before version 4
-                        if ( protocolVersion > 3 ) {
-                            if ( !receivedCRC.equals(calculatedCRC) ) {
+                        if (protocolVersion > 3) {
+                            if (!receivedCRC.equals(calculatedCRC)) {
                                 ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as "
+                                        + calculatedCRC + " while peer calculated CRC32 Checksum as "
+                                        + receivedCRC + "; canceling transaction and rolling back session");
                             }
                         }
-                        
+
                         ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
                     } else {
-                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer "
+                                + peer + " but received " + transactionConfirmationResponse);
                     }
-                    
+
                     state = TransactionState.TRANSACTION_CONFIRMED;
                 }
-	        } catch (final IOException ioe) {
-	            throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
-	        }
-	    } catch (final Exception e) {
-	        error();
-	        throw e;
-	    }
-	}
-
-	@Override
-	public void error() {
-	    this.state = TransactionState.ERROR;
-	}
-	
-	@Override
-	public TransactionState getState() {
-		return state;
-	}
-
-	@Override
-	public Communicant getCommunicant() {
-	    return peer;
-	}
-	
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public void error() {
+        this.state = TransactionState.ERROR;
+    }
+
+    @Override
+    public TransactionState getState() {
+        return state;
+    }
+
+    @Override
+    public Communicant getCommunicant() {
+        return peer;
+    }
+
     @Override
     public String toString() {
-        return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; 
+        return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
index 5eb6c91..bd95013 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
@@ -26,7 +26,7 @@ public class SocketClientTransactionCompletion implements TransactionCompletion
     private final int dataPacketsTransferred;
     private final long bytesTransferred;
     private final long durationNanos;
-    
+
     public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
         this.backoff = backoff;
         this.dataPacketsTransferred = dataPacketsTransferred;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
index 10352ec..d746abf 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
@@ -32,43 +32,44 @@ import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 
 public class NiFiRestApiUtil {
+
     public static final int RESPONSE_CODE_OK = 200;
-    
+
     private final SSLContext sslContext;
-    
+
     public NiFiRestApiUtil(final SSLContext sslContext) {
         this.sslContext = sslContext;
     }
-    
+
     private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException {
         final URL url = new URL(connUrl);
         final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
         connection.setConnectTimeout(timeoutMillis);
         connection.setReadTimeout(timeoutMillis);
-        
+
         // special handling for https
         if (sslContext != null && connection instanceof HttpsURLConnection) {
             HttpsURLConnection secureConnection = (HttpsURLConnection) connection;
             secureConnection.setSSLSocketFactory(sslContext.getSocketFactory());
 
             // check the trusted hostname property and override the HostnameVerifier
-            secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(), 
+            secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(),
                     secureConnection.getHostnameVerifier()));
         }
-        
+
         return connection;
     }
-    
+
     public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException {
         final HttpURLConnection connection = getConnection(url, timeoutMillis);
         connection.setRequestMethod("GET");
         final int responseCode = connection.getResponseCode();
-        
+
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         StreamUtils.copy(connection.getInputStream(), baos);
         final String responseMessage = baos.toString();
-        
-        if ( responseCode == RESPONSE_CODE_OK ) {
+
+        if (responseCode == RESPONSE_CODE_OK) {
             final ObjectMapper mapper = new ObjectMapper();
             final JsonNode jsonNode = mapper.readTree(responseMessage);
             final JsonNode controllerNode = jsonNode.get("controller");
@@ -77,8 +78,9 @@ public class NiFiRestApiUtil {
             throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage);
         }
     }
-    
+
     private static class OverrideHostnameVerifier implements HostnameVerifier {
+
         private final String trustedHostname;
         private final HostnameVerifier delegate;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index 6dab77b..c52b4b7 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -21,7 +21,8 @@ import java.util.Set;
 import org.apache.nifi.remote.PeerStatus;
 
 public class PeerStatusCache {
-	private final Set<PeerStatus> statuses;
+
+    private final Set<PeerStatus> statuses;
     private final long timestamp;
 
     public PeerStatusCache(final Set<PeerStatus> statuses) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
index bd1b50c..70bb324 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -25,26 +25,26 @@ import org.apache.nifi.stream.io.MinimumLengthInputStream;
 
 public class StandardDataPacket implements DataPacket {
 
-	private final Map<String, String> attributes;
-	private final InputStream stream;
-	private final long size;
-	
-	public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
-		this.attributes = attributes;
-		this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
-		this.size = size;
-	}
-	
-	public Map<String, String> getAttributes() {
-		return attributes;
-	}
-	
-	public InputStream getData() {
-		return stream;
-	}
-	
-	public long getSize() {
-		return size;
-	}
-	
+    private final Map<String, String> attributes;
+    private final InputStream stream;
+    private final long size;
+
+    public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+        this.attributes = attributes;
+        this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+        this.size = size;
+    }
+
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    public InputStream getData() {
+        return stream;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index c5cca78..8336745 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -40,11 +40,11 @@ public class TestEndpointConnectionStatePool {
 
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
-        for ( final PeerStatus peerStatus : destinations ) {
+        for (final PeerStatus peerStatus : destinations) {
             System.out.println(peerStatus.getPeerDescription());
         }
     }
-    
+
     @Test
     public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
         final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -54,14 +54,11 @@ public class TestEndpointConnectionStatePool {
 
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
-        for ( final PeerStatus peerStatus : destinations ) {
+        for (final PeerStatus peerStatus : destinations) {
             System.out.println(peerStatus.getPeerDescription());
         }
     }
-    
-    
-    
-    
+
     @Test
     public void testFormulateDestinationListForInputPorts() throws IOException {
         final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -74,11 +71,11 @@ public class TestEndpointConnectionStatePool {
 
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
-        for ( final PeerStatus peerStatus : destinations ) {
+        for (final PeerStatus peerStatus : destinations) {
             System.out.println(peerStatus.getPeerDescription());
         }
     }
-    
+
     @Test
     public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
         final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -88,7 +85,7 @@ public class TestEndpointConnectionStatePool {
 
         clusterNodeInfo.setNodeInformation(collection);
         final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
-        for ( final PeerStatus peerStatus : destinations ) {
+        for (final PeerStatus peerStatus : destinations) {
             System.out.println(peerStatus.getPeerDescription());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index b73e44d..155fc95 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
@@ -39,32 +38,32 @@ public class TestSiteToSiteClient {
     @Ignore("For local testing only; not really a unit test but a manual test")
     public void testReceive() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-        
+
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url("http://localhost:8080/nifi")
-            .portName("cba")
-            .requestBatchCount(10)
-            .build();
-        
+                .url("http://localhost:8080/nifi")
+                .portName("cba")
+                .requestBatchCount(10)
+                .build();
+
         try {
-            for (int i=0; i < 1000; i++) {
+            for (int i = 0; i < 1000; i++) {
                 final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
                 Assert.assertNotNull(transaction);
-                
+
                 DataPacket packet;
                 while (true) {
                     packet = transaction.receive();
-                    if ( packet == null ) {
+                    if (packet == null) {
                         break;
                     }
 
                     final InputStream in = packet.getData();
                     final long size = packet.getSize();
                     final byte[] buff = new byte[(int) size];
-                    
+
                     StreamUtils.fillBuffer(in, buff);
                 }
-                
+
                 transaction.confirm();
                 transaction.complete();
             }
@@ -72,34 +71,33 @@ public class TestSiteToSiteClient {
             client.close();
         }
     }
-    
-    
+
     @Test
     @Ignore("For local testing only; not really a unit test but a manual test")
     public void testSend() throws IOException {
         System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-        
+
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url("http://localhost:8080/nifi")
-            .portName("input")
-            .build();
-    
+                .url("http://localhost:8080/nifi")
+                .portName("input")
+                .build();
+
         try {
             final Transaction transaction = client.createTransaction(TransferDirection.SEND);
             Assert.assertNotNull(transaction);
-            
+
             final Map<String, String> attrs = new HashMap<>();
             attrs.put("site-to-site", "yes, please!");
             final byte[] bytes = "Hello".getBytes();
             final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
             final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
             transaction.send(packet);
-            
+
             transaction.confirm();
             transaction.complete();
         } finally {
             client.close();
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
index 172c593..cc24575 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
@@ -33,10 +33,6 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- * @author none
- */
 public abstract class AbstractChannelReader implements Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
@@ -91,12 +87,12 @@ public abstract class AbstractChannelReader implements Runnable {
      * Allows a subclass to specifically handle how it reads from the given
      * key's channel into the given buffer.
      *
-     * @param key
-     * @param buffer
+     * @param key of channel to read from
+     * @param buffer to fill
      * @return the number of bytes read in the final read cycle. A value of zero
      * or more indicates the channel is still open but a value of -1 indicates
      * end of stream.
-     * @throws IOException
+     * @throws IOException if reading from channel causes failure
      */
     protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
index a413ad2..007034b 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
@@ -25,10 +25,6 @@ import java.util.concurrent.LinkedBlockingDeque;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- * @author none
- */
 public class BufferPool implements Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
@@ -50,9 +46,9 @@ public class BufferPool implements Runnable {
     /**
      * Returns the given buffer to the pool - and clears it.
      *
-     * @param buffer
-     * @param bytesProcessed
-     * @return
+     * @param buffer buffer to return
+     * @param bytesProcessed bytes processed for this buffer being returned
+     * @return true if buffer returned to pool
      */
     public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
         totalBytesExtracted += bytesProcessed;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
index 2ae2c07..824f2df 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -35,10 +35,6 @@ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- * @author none
- */
 public final class ChannelDispatcher implements Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
@@ -81,8 +77,8 @@ public final class ChannelDispatcher implements Runnable {
     /*
      * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
      * channels' keys.
-     * 
-     * @throws IOException
+     *
+     * @throws IOException if unable to select keys
      */
     private void selectServerSocketKeys() throws IOException {
         int numSelected = serverSocketSelector.select(timeout);
@@ -121,8 +117,8 @@ public final class ChannelDispatcher implements Runnable {
      * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
      * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
      * the selected key set is not manually changed via a remove operation.
-     * 
-     * @throws IOException
+     *
+     * @throws IOException if unable to select keys
      */
     private void selectSocketChannelKeys() throws IOException {
         // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
@@ -138,7 +134,7 @@ public final class ChannelDispatcher implements Runnable {
             // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
             // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
             // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
-            // way to tell if it's new is the lack of an attachment. 
+            // way to tell if it's new is the lack of an attachment.
             if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
                 reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
                 socketChannelKey.attach(reader);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
index b0a1cfb..7cbf589 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
  * All ChannelReaders will get throttled by the unavailability of buffers in the
  * provided BufferPool. This is designed to create back pressure.
  *
- * @author none
  */
 public final class ChannelListener {
 
@@ -99,7 +98,7 @@ public final class ChannelListener {
      * @param port - port to bind to
      * @param receiveBufferSize - size of OS receive buffer to request. If less
      * than 0 then will not be set and OS default will win.
-     * @throws IOException
+     * @throws IOException if unable to add socket
      */
     public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
             throws IOException {
@@ -129,7 +128,7 @@ public final class ChannelListener {
      * @param port - the port to listen on
      * @param receiveBufferSize - the number of bytes to request for a receive
      * buffer from OS
-     * @throws IOException
+     * @throws IOException if unable to add channel
      */
     public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
             throws IOException {
@@ -156,7 +155,7 @@ public final class ChannelListener {
      * any network interface on the local host.
      * @param sendingPort - the port used by the sender of datagrams. Only
      * datagrams from this port will be received.
-     * @throws IOException
+     * @throws IOException if unable to add channel
      */
     public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
             final Integer sendingPort) throws IOException {