You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 02:04:43 UTC

incubator-nifi git commit: NIFI-282: Refactoring to extract client util

Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client 2aaed7021 -> 77fd8e5ec


NIFI-282: Refactoring to extract client util


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/77fd8e5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/77fd8e5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/77fd8e5e

Branch: refs/heads/site-to-site-client
Commit: 77fd8e5ec7e86095d6235deae91939db6412eeb1
Parents: 2aaed70
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 21 20:04:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 21 20:04:36 2015 -0500

----------------------------------------------------------------------
 .../nifi/stream/io/LimitingInputStream.java     | 111 ++++
 .../stream/io/MinimumLengthInputStream.java     |  93 +++
 .../org/apache/nifi/remote/Transaction.java     |  44 ++
 .../nifi/remote/client/SiteToSiteClient.java    |   7 +-
 .../nifi/remote/client/socket/SocketClient.java |  54 +-
 .../remote/codec/StandardFlowFileCodec.java     |  78 +--
 .../remote/exception/ProtocolException.java     |   4 +-
 .../nifi/remote/protocol/ClientProtocol.java    |  13 +-
 .../protocol/socket/SocketClientProtocol.java   | 618 +++++--------------
 .../socket/SocketClientTransaction.java         | 260 +++++++-
 .../nifi/remote/util/StandardDataPacket.java    |  50 ++
 .../nifi/remote/StandardRemoteGroupPort.java    |   2 +-
 .../socket/SocketFlowFileServerProtocol.java    |  20 +-
 13 files changed, 758 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+
+    public long getLimit() {
+    	return limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
new file mode 100644
index 0000000..2e93599
--- /dev/null
+++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that will throw EOFException if the underlying InputStream runs out of data before reaching the
+ * configured minimum amount of data
+ */
+public class MinimumLengthInputStream extends FilterInputStream {
+
+	private final long minLength;
+	private long consumedCount = 0L;
+	
+	public MinimumLengthInputStream(final InputStream in, final long minLength) {
+		super(in);
+		this.minLength = minLength;
+	}
+
+	
+	@Override
+	public int read() throws IOException {
+		final int b = super.read();
+		if ( b < 0 && consumedCount < minLength ) {
+			throw new EOFException();
+		}
+		
+		if ( b >= 0 ) {
+			consumedCount++;
+		}
+		
+		return b;
+	}
+	
+	@Override
+	public int read(byte[] b) throws IOException {
+		return read(b, 0, b.length);
+	}
+	
+	public int read(byte[] b, int off, int len) throws IOException {
+		final int num = super.read(b, off, len);
+		
+		if ( num < 0 && consumedCount < minLength ) {
+			throw new EOFException();
+		}
+		
+		if ( num >= 0 ) {
+			consumedCount += num;
+		}
+
+		return num;
+	}
+	
+	@Override
+	public long skip(final long n) throws IOException {
+		long skipped = super.skip(n);
+		if ( skipped < 1 ) {
+			final int b = super.read();
+			if ( b >= 0 ) {
+				skipped = 1;
+			}
+		}
+		
+		if ( skipped < 0 && consumedCount < minLength ) {
+			throw new EOFException();
+		}
+		
+		if ( skipped >= 0 ) {
+			consumedCount += skipped;
+		}
+		
+		return skipped;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..6c136fc
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface Transaction {
+
+	void confirm() throws IOException;
+	
+	void complete(boolean applyBackpressure) throws IOException;
+	
+	void cancel() throws IOException;
+	
+	void send(DataPacket dataPacket) throws IOException;
+	
+	DataPacket receive() throws IOException;
+	
+	TransactionState getState() throws IOException;
+	
+	public enum TransactionState {
+		TRANSACTION_STARTED,
+		DATA_EXCHANGED,
+		TRANSACTION_CONFIRMED,
+		TRANSACTION_COMPLETED,
+		TRANSACTION_CANCELED;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 34cb56a..164a63c 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -19,12 +19,11 @@ package org.apache.nifi.remote.client;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
 
 public interface SiteToSiteClient extends Closeable {
 
-	void send(DataPacket dataPacket) throws IOException;
-	
-	DataPacket receive() throws IOException;
+	Transaction createTransaction(TransferDirection direction) throws IOException;
 	
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index b81b425..88eb5e8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -24,6 +24,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -65,7 +66,7 @@ public class SocketClient implements SiteToSiteClient {
 	
 	
 	@Override
-	public void send(final DataPacket dataPacket) throws IOException {
+	public Transaction createTransaction(final TransferDirection direction) throws IOException {
 		final String portId = getPortIdentifier(TransferDirection.SEND);
 		
 		if ( portId == null ) {
@@ -91,19 +92,58 @@ public class SocketClient implements SiteToSiteClient {
 		
 		final EndpointConnectionState connectionState;
 		try {
-			connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+			connectionState = pool.getEndpointConnectionState(remoteDestination, direction);
 		} catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
 			throw new IOException(e);
 		}
 		
+		final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
+				connectionState.getPeer(), connectionState.getCodec(), direction);
 		
-	}
+		// Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
+		// the transaction is either completed or canceled.
+		return new Transaction() {
+			@Override
+			public void confirm() throws IOException {
+				transaction.confirm();
+			}
 
-	@Override
-	public DataPacket receive() throws IOException {
-		// TODO Auto-generated method stub
-		return null;
+			@Override
+			public void complete(final boolean applyBackpressure) throws IOException {
+				try {
+					transaction.complete(applyBackpressure);
+				} finally {
+					pool.offer(connectionState);
+				}
+			}
+
+			@Override
+			public void cancel() throws IOException {
+				try {
+					transaction.cancel();
+				} finally {
+					pool.offer(connectionState);
+				}
+			}
+
+			@Override
+			public void send(final DataPacket dataPacket) throws IOException {
+				transaction.send(dataPacket);
+			}
+
+			@Override
+			public DataPacket receive() throws IOException {
+				return transaction.receive();
+			}
+
+			@Override
+			public TransactionState getState() throws IOException {
+				return transaction.getState();
+			}
+			
+		};
 	}
+
 	
 	@Override
 	public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index d18a4ee..6fd92de 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -26,14 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
 
 public class StandardFlowFileCodec implements FlowFileCodec {
 	public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec {
     }
     
     @Override
-    public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException {
+    public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException {
         final DataOutputStream out = new DataOutputStream(encodedOut);
         
-        final Map<String, String> attributes = flowFile.getAttributes();
+        final Map<String, String> attributes = dataPacket.getAttributes();
         out.writeInt(attributes.size());
         for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
             writeString(entry.getKey(), out);
             writeString(entry.getValue(), out);
         }
         
-        out.writeLong(flowFile.getSize());
-        
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                final byte[] buffer = new byte[8192];
-                int len;
-                while ( (len = in.read(buffer)) > 0 ) {
-                    encodedOut.write(buffer, 0, len);
-                }
-                
-                encodedOut.flush();
-            }
-        });
+        out.writeLong(dataPacket.getSize());
         
-        return flowFile;
+        final InputStream in = dataPacket.getData();
+        StreamUtils.copy(in, encodedOut);
+        encodedOut.flush();
     }
 
     
     @Override
-    public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException {
+    public DataPacket decode(final InputStream stream) throws IOException, ProtocolException {
         final DataInputStream in = new DataInputStream(stream);
         
         final int numAttributes;
@@ -94,43 +81,16 @@ public class StandardFlowFileCodec implements FlowFileCodec {
         	throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes);
         }
         
-        try {
-            final Map<String, String> attributes = new HashMap<>(numAttributes);
-            for (int i=0; i < numAttributes; i++) {
-                final String attrName = readString(in);
-                final String attrValue = readString(in);
-                attributes.put(attrName, attrValue);
-            }
-            
-            final long numBytes = in.readLong();
-            
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    int len;
-                    long size = 0;
-                    final byte[] buffer = new byte[8192];
-                    
-                    while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) {
-                        out.write(buffer, 0, len);
-                        size += len;
-                    }
-
-                    if ( size != numBytes ) {
-                        throw new EOFException("Expected " + numBytes + " bytes but received only " + size);
-                    }
-                }
-            });
-
-            return flowFile;
-        } catch (final EOFException e) {
-        	session.rollback();
-        	
-            // we throw the general IOException here because we did not expect to hit EOFException
-            throw e;
+        final Map<String, String> attributes = new HashMap<>(numAttributes);
+        for (int i=0; i < numAttributes; i++) {
+            final String attrName = readString(in);
+            final String attrValue = readString(in);
+            attributes.put(attrName, attrValue);
         }
+        
+        final long numBytes = in.readLong();
+        
+        return new StandardDataPacket(attributes, stream, numBytes);
     }
 
     private void writeString(final String val, final DataOutputStream out) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index 0f50b98..e12348a 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.remote.exception;
 
-public class ProtocolException extends Exception {
+import java.io.IOException;
+
+public class ProtocolException extends IOException {
 
     private static final long serialVersionUID = 5763900324505818495L;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 51d3970..befbdaa 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -50,17 +51,7 @@ public interface ClientProtocol extends VersionedRemoteResource {
     
     
     
-    void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException;
-    
-    void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException;
-    
-    void rollbackTransaction();
-    
-    // must be done within a transaction.
-    void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
-    
-    // must be done within a transaction.
-    DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException;
+    Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException;
     
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 58d26d4..b4d1e5d 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -20,16 +20,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,18 +38,18 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
@@ -74,6 +70,8 @@ public class SocketClientProtocol implements ClientProtocol {
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
+
+    private SocketClientTransaction transaction;
     
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     
@@ -236,11 +234,8 @@ public class SocketClientProtocol implements ClientProtocol {
     }
 
 
-    // TODO: move up to top with member variables
-    private SocketClientTransaction transaction;
-    
     @Override
-    public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException {
+    public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
         if ( !handshakeComplete ) {
             throw new IllegalStateException("Handshake has not been performed");
         }
@@ -248,204 +243,29 @@ public class SocketClientProtocol implements ClientProtocol {
             throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
         }
         
-        transaction = new SocketClientTransaction(peer, direction, useCompression);
-
-        final DataOutputStream dos = transaction.getDataOutputStream();
-        if ( direction == TransferDirection.RECEIVE ) {
-            // Indicate that we would like to have some data
-            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
-            dos.flush();
-            
-            final Response dataAvailableCode = Response.read(transaction.getDataInputStream());
-            switch (dataAvailableCode.getCode()) {
-                case MORE_DATA:
-                    logger.debug("{} {} Indicates that data is available", this, peer);
-                    transaction.setDataAvailable(true);
-                    break;
-                case NO_MORE_DATA:
-                    logger.debug("{} No data available from {}", peer);
-                    transaction.setDataAvailable(false);
-                    return;
-                default:
-                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
-            }
-
-        } else {
-            // Indicate that we would like to have some data
-            RequestType.SEND_FLOWFILES.writeRequestType(dos);
-            dos.flush();
-        }
+        return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec, 
+        		direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
     }
-    
-    @Override
-    public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException {
-    	if ( transaction == null ) {
-    		throw new IllegalStateException("Cannot receive data because no transaction has been started");
-    	}
-    	
-    	if ( transaction.getTransferDirection() == TransferDirection.SEND ) {
-    	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
-    	}
 
-    	// if no data available, return null
-    	if ( !transaction.isDataAvailable() ) {
-    	    return null;
-    	}
-    	
-    	final Peer peer = transaction.getPeer();
-        logger.debug("{} Receiving data from {}", this, peer);
-        final DataPacket packet = codec.decode(transaction.createCheckedInputStream());
-        
-        if ( packet != null ) {
-            transaction.incrementTransferCount();
-            
-            // Determine if Peer will send us data or has no data to send us
-            final DataInputStream dis = transaction.getDataInputStream();
-            final Response dataAvailableCode = Response.read(dis);
-            switch (dataAvailableCode.getCode()) {
-                case MORE_DATA:
-                    logger.debug("{} {} Indicates that data is available", this, peer);
-                    transaction.setDataAvailable(true);
-                    break;
-                case NO_MORE_DATA:
-                    logger.debug("{} No data available from {}", peer);
-                    transaction.setDataAvailable(false);
-                    break;
-                default:
-                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
-            }
-        }
-        
-        return packet;
-    }
 
     
-    @Override
-    public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException {
-        if ( transaction == null ) {
-            throw new IllegalStateException("Cannot send data because no transaction has been started");
-        }
-
-        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
-            throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
-        }
-
-        final Peer peer = transaction.getPeer();
-        logger.debug("{} Sending data to {}", this, peer);
-
-        if ( transaction.getTransferCount() > 0 ) {
-            ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream());
-        }
+    // TODO: Transaction should be pulled out into its own class.
+    //			Flow of execution:
+    //			- start transaction
+    //			- send/receive data
+    //			- confirm contents
+    // 			- complete / rollback
+    //
+    //			- this class should validate transaction state before each step.
+    // We need to confirm transaction to ensure that data is correct. Yes, it is sent via TCP, which should ensure that the
+    // data is correct, but things happen. Humans make mistakes. There could easily be a bug on our end, for example. And this
+    // will ensure that we guard against that. It's a good defensive programming strategy.
+    public void confirmTransaction() throws IOException {
         
-        final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream();
-        codec.encode(dataPacket, checkedOutStream);
-        
-        // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
-        // Otherwise, do NOT close it because we don't want to close the underlying stream
-        // (CompressionOutputStream will not close the underlying stream when it's closed)
-        if ( useCompression ) {
-            checkedOutStream.close();
-        }
-        
-        transaction.incrementTransferCount();
     }
     
     
-    @Override
-    public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException {
-        final SocketClientTransaction transaction = this.transaction;
-        this.transaction = null;
-        
-        if ( transaction == null ) {
-            throw new IllegalStateException("Cannot complete transaction because no transaction has been started");
-        }
-        
-        final Peer peer = transaction.getPeer();
-        
-        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
-            final boolean moreData = transaction.isDataAvailable();
-            if ( moreData ) {
-                throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
-            }
-            
-            // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
-            // to peer so that we can verify that the connection is still open. This is a two-phase commit,
-            // which helps to prevent the chances of data duplication. Without doing this, we may commit the
-            // session and then when we send the response back to the peer, the peer may have timed out and may not
-            // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
-            // Critical Section involved in this transaction so that rather than the Critical Section being the
-            // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-            logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
-            final String calculatedCRC = transaction.calculateCRC();
-            ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC);
-            
-            final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream());
-            logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-            
-            switch (confirmTransactionResponse.getCode()) {
-                case CONFIRM_TRANSACTION:
-                    break;
-                case BAD_CHECKSUM:
-                    throw new IOException(this + " Received a BadChecksum response from peer " + peer);
-                default:
-                    throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
-            }
-            
-            if ( applyBackPressure ) {
-                // Confirm that we received the data and the peer can now discard it but that the peer should not
-                // send any more data for a bit
-                logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-                ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream());
-            } else {
-                // Confirm that we received the data and the peer can now discard it
-                logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
-                ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream());
-            }
-        } else {
-            logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
-            ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream());
-            
-            final String calculatedCRC = transaction.calculateCRC();
-            
-            // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
-            final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream());
-            if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
-                // Confirm checksum and echo back the confirmation.
-                logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
-                final String receivedCRC = transactionConfirmationResponse.getMessage();
-                
-                if ( versionNegotiator.getVersion() > 3 ) {
-                    if ( !receivedCRC.equals(calculatedCRC) ) {
-                        ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream());
-                        throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
-                    }
-                }
-                
-                ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), "");
-            } else {
-                throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
-            }
-        
-            final Response transactionResponse;
-            try {
-                transactionResponse = Response.read(transaction.getDataInputStream());
-            } catch (final IOException e) {
-                throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
-                        "It is unknown whether or not the peer successfully received/processed the data.", e);
-            }
-            
-            logger.debug("{} Received {} from {}", this, transactionResponse, peer);
-            if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-                peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-            } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
-                throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
-            }
-        }
-    }
-    
-    
-    @Override
-    public void rollbackTransaction() {
+    public void cancelTransaction() {
         final SocketClientTransaction transaction = this.transaction;
         this.transaction = null;
         
@@ -456,296 +276,134 @@ public class SocketClientProtocol implements ClientProtocol {
         // TODO: IMPLEMENT
     }
     
+    
     @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse);
-        }
-
-        logger.debug("{} Receiving FlowFiles from {}", this, peer);
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        // Determine if Peer will send us data or has no data to send us
-        final Response dataAvailableCode = Response.read(dis);
-        switch (dataAvailableCode.getCode()) {
-            case MORE_DATA:
-                logger.debug("{} {} Indicates that data is available", this, peer);
-                break;
-            case NO_MORE_DATA:
-            	context.yield();
-                logger.debug("{} No data available from {}", peer);
-                return;
-            default:
-                throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-        final Set<FlowFile> flowFilesReceived = new HashSet<>();
-        long bytesReceived = 0L;
-        final CRC32 crc = new CRC32();
-        
-        // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data.
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        while (continueTransaction) {
-            final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis;
-            final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            
-            final DataPacket dataPacket = codec.decode(checkedIn);
-            FlowFile flowFile = session.create();
-            flowFile = session.importFrom(dataPacket.getData(), flowFile);
-            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
-            
-            final long transmissionNanos = System.nanoTime() - startNanos;
-            final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
-            
-            final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-            
-            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
-            
-            session.transfer(flowFile, Relationship.ANONYMOUS);
-            bytesReceived += flowFile.getSize();
-            flowFilesReceived.add(flowFile);
-            logger.debug("{} Received {} from {}", this, flowFile, peer);
-            
-            final Response transactionCode = Response.read(dis);
-            switch (transactionCode.getCode()) {
-                case CONTINUE_TRANSACTION:
-                    logger.trace("{} Received ContinueTransaction indicator from {}", this, peer);
-                    break;
-                case FINISH_TRANSACTION:
-                    logger.trace("{} Received FinishTransaction indicator from {}", this, peer);
-                    continueTransaction = false;
-                    calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
-                    break;
-                default:
-                    throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
-            }
-        }
-        
-        // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
-        // to peer so that we can verify that the connection is still open. This is a two-phase commit,
-        // which helps to prevent the chances of data duplication. Without doing this, we may commit the
-        // session and then when we send the response back to the peer, the peer may have timed out and may not
-        // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
-        // Critical Section involved in this transaction so that rather than the Critical Section being the
-        // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
-        logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
-        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-        
-        final Response confirmTransactionResponse = Response.read(dis);
-        logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-        
-        switch (confirmTransactionResponse.getCode()) {
-            case CONFIRM_TRANSACTION:
-                break;
-            case BAD_CHECKSUM:
-                session.rollback();
-                throw new IOException(this + " Received a BadChecksum response from peer " + peer);
-            default:
-                throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
-        }
-        
-        // Commit the session so that we have persisted the data
-        session.commit();
-        
-        if ( context.getAvailableRelationships().isEmpty() ) {
-            // Confirm that we received the data and the peer can now discard it but that the peer should not
-            // send any more data for a bit
-            logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-            ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
-        } else {
-            // Confirm that we received the data and the peer can now discard it
-            logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
-            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-        }
-        
-        stopWatch.stop();
-        final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-        final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-        logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
-    }
+    	final String userDn = peer.getCommunicationsSession().getUserDn();
+    	final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+    	
+    	final StopWatch stopWatch = new StopWatch(true);
+    	final Set<FlowFile> flowFilesReceived = new HashSet<>();
+    	long bytesReceived = 0L;
+    	
+    	while (true) {
+    		final long start = System.nanoTime();
+    		final DataPacket dataPacket = transaction.receive();
+    		if ( dataPacket == null ) {
+    			break;
+    		}
+    		
+    		FlowFile flowFile = session.create();
+    		flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+    		flowFile = session.importFrom(dataPacket.getData(), flowFile);
+    		final long receiveNanos = System.nanoTime() - start;
+    		
+			String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+			if ( sourceFlowFileIdentifier == null ) {
+				sourceFlowFileIdentifier = "<Unknown Identifier>";
+			}
+			
+			final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+			session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
 
-    @Override
-    public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse);
-        }
+    		session.transfer(flowFile, Relationship.ANONYMOUS);
+    		bytesReceived += dataPacket.getSize();
+    	}
 
-        FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
-            return;
-        }
+    	// Confirm that what we received was the correct data.
+    	transaction.confirm();
+    	
+		// Commit the session so that we have persisted the data
+		session.commit();
 
-        logger.debug("{} Sending FlowFiles to {}", this, peer);
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.SEND_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        final StopWatch stopWatch = new StopWatch(true);
-        final CRC32 crc = new CRC32();
-        
-        long bytesSent = 0L;
-        final Set<FlowFile> flowFilesSent = new HashSet<>();
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        final long startSendingNanos = System.nanoTime();
-        while (continueTransaction) {
-            final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos;
-            logger.debug("{} Sending {} to {}", this, flowFile, peer);
-            
-            final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            
-            // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
-            final FlowFile toWrap = flowFile;
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    final DataPacket dataPacket = new DataPacket() {
-                        @Override
-                        public Map<String, String> getAttributes() {
-                            return toWrap.getAttributes();
-                        }
+		// We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
+		final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
 
-                        @Override
-                        public InputStream getData() {
-                            return in;
-                        }
+		transaction.complete(applyBackpressure);
+		logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
 
-                        @Override
-                        public long getSize() {
-                            return toWrap.getSize();
-                        }
-                    };
-                    
-                    codec.encode(dataPacket, checkedOutStream);
-                }
-            });
-            
-            final long transferNanos = System.nanoTime() - startNanos;
-            final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-            
-            // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
-            // Otherwise, do NOT close it because we don't want to close the underlying stream
-            // (CompressionOutputStream will not close the underlying stream when it's closed)
-            if ( useCompression ) {
-                checkedOutStream.close();
-            }
-            
-            flowFilesSent.add(flowFile);
-            bytesSent += flowFile.getSize();
-            logger.debug("{} Sent {} to {}", this, flowFile, peer);
-            
-            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
-            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
-            session.remove(flowFile);
-            
-            final long sendingNanos = System.nanoTime() - startSendingNanos;
-            if ( sendingNanos < BATCH_SEND_NANOS ) { 
-                flowFile = session.get();
-            } else {
-                flowFile = null;
-            }
-            
-            continueTransaction = (flowFile != null);
-            if ( continueTransaction ) {
-                logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer);
-                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-            } else {
-                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
-                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                
-                calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() );
-            }
-        }
-        
-        // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
-        final Response transactionConfirmationResponse = Response.read(dis);
-        if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
-            // Confirm checksum and echo back the confirmation.
-            logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
-            final String receivedCRC = transactionConfirmationResponse.getMessage();
-            
-            if ( versionNegotiator.getVersion() > 3 ) {
-                if ( !receivedCRC.equals(calculatedCRC) ) {
-                    ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                    session.rollback();
-                    throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
-                }
-            }
-            
-            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
-        } else {
-            throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
-        }
+		stopWatch.stop();
+		final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+		final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+		final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+		final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+		logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { 
+				this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
+    }
 
-        final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+    
+    @Override
+    public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+		FlowFile flowFile = session.get();
+		if (flowFile == null) {
+			return;
+		}
 
-        final Response transactionResponse;
-        try {
-            transactionResponse = Response.read(dis);
-        } catch (final IOException e) {
-            logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
-                    " It is unknown whether or not the peer successfully received/processed the data." +
-                    " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", 
-                    this, peer, session, flowFileDescription);
-            session.rollback();
-            throw e;
-        }
-        
-        logger.debug("{} Received {} from {}", this, transactionResponse, peer);
-        if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-            peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-        } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
-            throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
-        }
-        
-        // consume input stream entirely, ignoring its contents. If we
-        // don't do this, the Connection will not be returned to the pool
-        stopWatch.stop();
-        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesSent);
-        
-        session.commit();
-        
-        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+		try {
+			final String userDn = peer.getCommunicationsSession().getUserDn();
+			final long startSendingNanos = System.nanoTime();
+			final StopWatch stopWatch = new StopWatch(true);
+			long bytesSent = 0L;
+			
+			final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+			
+			final Set<FlowFile> flowFilesSent = new HashSet<>();
+	        boolean continueTransaction = true;
+	        while (continueTransaction) {
+	        	final long startNanos = System.nanoTime();
+	            // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+	            final FlowFile toWrap = flowFile;
+	            session.read(flowFile, new InputStreamCallback() {
+	                @Override
+	                public void process(final InputStream in) throws IOException {
+	                    final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+	                    transaction.send(dataPacket);
+	                }
+	            });
+	            
+	            final long transferNanos = System.nanoTime() - startNanos;
+	            final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+	            
+	            flowFilesSent.add(flowFile);
+	            bytesSent += flowFile.getSize();
+	            logger.debug("{} Sent {} to {}", this, flowFile, peer);
+	            
+	            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+	            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+	            session.remove(flowFile);
+	            
+	            final long sendingNanos = System.nanoTime() - startSendingNanos;
+	            if ( sendingNanos < BATCH_SEND_NANOS ) { 
+	                flowFile = session.get();
+	            } else {
+	                flowFile = null;
+	            }
+	            
+	            continueTransaction = (flowFile != null);
+	        }
+	        
+	        transaction.confirm();
+	        
+	        // consume input stream entirely, ignoring its contents. If we
+	        // don't do this, the Connection will not be returned to the pool
+	        stopWatch.stop();
+	        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+	        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+	        final String dataSize = FormatUtils.formatDataSize(bytesSent);
+	        
+	        session.commit();
+	        transaction.complete(false);
+	        
+	        final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+	        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+	            this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+		} catch (final Exception e) {
+			session.rollback();
+			throw e;
+		}
     }
-
+    
+    
     @Override
     public VersionNegotiator getVersionNegotiator() {
         return versionNegotiator;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 83522a5..129e5aa 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,74 +19,272 @@ package org.apache.nifi.remote.protocol.socket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 
 import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SocketClientTransaction {
-	private final long startTime = System.nanoTime();
-	private final CRC32 crc = new CRC32();
+public class SocketClientTransaction implements Transaction {
+	private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
 	
-	private final Peer peer;
 	
+	private final CRC32 crc = new CRC32();
+	private final int protocolVersion;
+	private final FlowFileCodec codec;
 	private final DataInputStream dis;
 	private final DataOutputStream dos;
 	private final TransferDirection direction;
+	private final boolean compress;
+	private final Peer peer;
+	private final int penaltyMillis;
 	
 	private boolean dataAvailable = false;
 	private int transfers = 0;
+	private TransactionState state;
 	
-	SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
+	SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec, 
+			final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
+		this.protocolVersion = protocolVersion;
 		this.peer = peer;
+		this.codec = codec;
 		this.direction = direction;
 		this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
 		this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+		this.compress = useCompression;
+		this.state = TransactionState.TRANSACTION_STARTED;
+		this.penaltyMillis = penaltyMillis;
+		
+		initialize();
 	}
 	
-	int getTransferCount() {
-	    return transfers;
+	// TODO: UPDATE STATE
+	private void initialize() throws IOException {
+        if ( direction == TransferDirection.RECEIVE ) {
+            // Indicate that we would like to have some data
+            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+            
+            final Response dataAvailableCode = Response.read(dis);
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", this, peer);
+                    this.dataAvailable = true;
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    this.dataAvailable = false;
+                    return;
+                default:
+                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+            }
+
+        } else {
+            // Indicate that we would like to have some data
+            RequestType.SEND_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        }
 	}
 	
-	void incrementTransferCount() {
-	    transfers++;
-	}
 	
-	void setDataAvailable(final boolean available) {
-	    this.dataAvailable = available;
+	// TODO: UPDATE STATE
+	@Override
+	public DataPacket receive() throws IOException {
+		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+			throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+		}
+		
+    	if ( direction == TransferDirection.SEND ) {
+    	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+    	}
+
+    	// if no data available, return null
+    	if ( !dataAvailable ) {
+    	    return null;
+    	}
+    	
+        logger.debug("{} Receiving data from {}", this, peer);
+        final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
+        
+        if ( packet != null ) {
+        	transfers++;
+            
+            // Determine if Peer will send us data or has no data to send us
+            final Response dataAvailableCode = Response.read(dis);
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", this, peer);
+                    this.dataAvailable = true;
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    this.dataAvailable = false;
+                    break;
+                default:
+                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+            }
+        }
+        
+        return packet;
 	}
 	
-	boolean isDataAvailable() {
-	    return dataAvailable;
-	}
 	
-	TransferDirection getTransferDirection() {
-	    return direction;
+	// TODO: UPDATE STATE
+	@Override
+	public void send(DataPacket dataPacket) throws IOException {
+		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+			throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+		}
+
+        if ( direction == TransferDirection.RECEIVE ) {
+            throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+        }
+
+		if ( transfers > 0 ) {
+            ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+        }
+
+        logger.debug("{} Sending data to {}", this, peer);
+
+		final OutputStream out = new CheckedOutputStream(dos, crc);
+        codec.encode(dataPacket, out);
+        
+        // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+        // Otherwise, do NOT close it because we don't want to close the underlying stream
+        // (CompressionOutputStream will not close the underlying stream when it's closed)
+        if ( compress ) {
+        	out.close();
+        }
+        
+        transfers++;
 	}
 	
-	DataOutputStream getDataOutputStream() {
-		return dos;
-	}
 	
-	DataInputStream getDataInputStream() {
-	    return dis;
+	// TODO: UPDATE STATE
+	@Override
+	public void cancel() {
+		if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED ) {
+			throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+		}
+		
+		// TODO: implement
 	}
 	
-	CheckedInputStream createCheckedInputStream() {
-	    return new CheckedInputStream(dis, crc);
-	}
 	
-	CheckedOutputStream createCheckedOutputStream() {
-	    return new CheckedOutputStream(dos, crc);
+	// TODO: UPDATE STATE
+	@Override
+	public void complete(boolean applyBackPressure) throws IOException {
+		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+			throw new IllegalStateException("Cannot complete transaction because state is " + state + 
+					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+		}
+		
+		if ( direction == TransferDirection.RECEIVE ) {
+            if ( applyBackPressure ) {
+                // Confirm that we received the data and the peer can now discard it but that the peer should not
+                // send any more data for a bit
+                logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+                ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+            } else {
+                // Confirm that we received the data and the peer can now discard it
+                logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+                ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+            }
+        } else {
+            final Response transactionResponse;
+            try {
+                transactionResponse = Response.read(dis);
+            } catch (final IOException e) {
+                throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+                        "It is unknown whether or not the peer successfully received/processed the data.", e);
+            }
+            
+            logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+            if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                peer.penalize(penaltyMillis);
+            } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+                throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+            }
+        }
 	}
 	
-	Peer getPeer() {
-		return peer;
+	
+	// TODO: UPDATE STATE
+	@Override
+	public void confirm() throws IOException {
+		if ( state != TransactionState.DATA_EXCHANGED ) {
+			throw new IllegalStateException("Cannot confirm Transaction because state is " + state + 
+					"; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+		}
+
+        if ( direction == TransferDirection.RECEIVE ) {
+            if ( dataAvailable ) {
+                throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+            }
+            
+            // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+            // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+            // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+            // session and then when we send the response back to the peer, the peer may have timed out and may not
+            // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+            // Critical Section involved in this transaction so that rather than the Critical Section being the
+            // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+            logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+            final String calculatedCRC = String.valueOf(crc.getValue());
+            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+            
+            final Response confirmTransactionResponse = Response.read(dis);
+            logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+            
+            switch (confirmTransactionResponse.getCode()) {
+                case CONFIRM_TRANSACTION:
+                    break;
+                case BAD_CHECKSUM:
+                    throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+                default:
+                    throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+            }
+        } else {
+            logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+            ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+            
+            final String calculatedCRC = String.valueOf(crc.getValue());
+            
+            // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+            final Response transactionConfirmationResponse = Response.read(dis);
+            if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+                // Confirm checksum and echo back the confirmation.
+                logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+                final String receivedCRC = transactionConfirmationResponse.getMessage();
+                
+                // CRC was not used before version 4
+                if ( protocolVersion > 3 ) {
+                    if ( !receivedCRC.equals(calculatedCRC) ) {
+                        ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                        throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                    }
+                }
+                
+                ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+            } else {
+                throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+            }
+        }
 	}
+
 	
-	String calculateCRC() {
-	    return String.valueOf(crc.getValue());
+	// TODO: UPDATE STATE
+	@Override
+	public TransactionState getState() {
+		return state;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+	private final Map<String, String> attributes;
+	private final InputStream stream;
+	private final long size;
+	
+	public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+		this.attributes = attributes;
+		this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+		this.size = size;
+	}
+	
+	public Map<String, String> getAttributes() {
+		return attributes;
+	}
+	
+	public InputStream getData() {
+		return stream;
+	}
+	
+	public long getSize() {
+		return size;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 82d8206..a51cdba 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -157,7 +157,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             logger.error(message);
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
-        } catch (final ProtocolException | HandshakeException | IOException e) {
+        } catch (final HandshakeException | IOException e) {
             final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
             logger.error(message);
             if ( logger.isDebugEnabled() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 887429c..d4b9c2f 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -40,6 +40,7 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PortAuthorizationResult;
 import org.apache.nifi.remote.RemoteResourceFactory;
@@ -53,8 +54,10 @@ import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
@@ -304,7 +307,16 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
 
             final StopWatch transferWatch = new StopWatch(true);
-            flowFile = codec.encode(flowFile, session, checkedOutputStream);
+            
+            final FlowFile toSend = flowFile;
+            session.read(flowFile, new InputStreamCallback() {
+				@Override
+				public void process(final InputStream in) throws IOException {
+					final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+					codec.encode(dataPacket, checkedOutputStream);
+				}
+            });
+            
             final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
             
             // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
@@ -427,7 +439,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis;
             final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
 
-            FlowFile flowFile = codec.decode(checkedInputStream, session);
+            final DataPacket dataPacket = codec.decode(checkedInputStream);
+            FlowFile flowFile = session.create();
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+            
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());