You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/21 13:35:24 UTC

incubator-nifi git commit: NIFI-4: Refactoring protocols to be used with DataPacket instead of FlowFile as a more generic data model

Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client c174d3a60 -> 2aaed7021


NIFI-4: Refactoring protocols to be used with DataPacket instead of FlowFile as a more generic data model


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2aaed702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2aaed702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2aaed702

Branch: refs/heads/site-to-site-client
Commit: 2aaed7021d0ed0b6cc81415edbef19c95d8ea68c
Parents: c174d3a
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 21 07:35:09 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 21 07:35:09 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/remote/client/Transaction.java  |  21 --
 .../apache/nifi/remote/codec/FlowFileCodec.java |  32 +--
 .../nifi/remote/protocol/ClientProtocol.java    |  10 +-
 .../protocol/socket/SocketClientProtocol.java   | 236 +++++++++++++++++--
 .../socket/SocketClientTransaction.java         |  54 +++--
 5 files changed, 272 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
deleted file mode 100644
index bae6e51..0000000
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-public interface Transaction {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
index b4206b3..1380e1b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -21,11 +21,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 /**
  * <p>
@@ -44,36 +43,29 @@ public interface FlowFileCodec extends VersionedRemoteResource {
     public List<Integer> getSupportedVersions();
 
     /**
-     * Encodes a FlowFile and its content as a single stream of data and writes
-     * that stream to the output. If checksum is not null, it will be calculated
-     * as the stream is read
+     * Encodes a DataPacket and its content as a single stream of data and writes
+     * that stream to the output.
      *
-     * @param flowFile the FlowFile to encode
-     * @param session a session that can be used to transactionally create and
-     * transfer flow files
+     * @param dataPacket the data to serialize
      * @param outStream the stream to write the data to
      *
-     * @return the updated FlowFile
-     *
-     * @throws IOException
+     * @throws IOException if there is a communications issue
+     * @throws TransmissionDisabledException if a user terminates the connection
      */
-    FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
+    void encode(DataPacket dataPacket, OutputStream outStream) throws IOException, TransmissionDisabledException;
 
     /**
      * Decodes the contents of the InputStream, interpreting the data to
-     * determine the next FlowFile's attributes and content, as well as their
-     * destinations. If not null, checksum will be used to calculate the
-     * checksum as the data is read.
+     * determine the next DataPacket's attributes and content.
      *
-     * @param stream an InputStream containing FlowFiles' contents, attributes,
-     * and destinations
-     * @param session
+     * @param stream an InputStream containing DataPacket's content and attributes
      *
-     * @return the FlowFile that was created, or <code>null</code> if the stream
+     * @return the DataPacket that was created, or <code>null</code> if the stream
      * was out of data
      *
      * @throws IOException
      * @throws ProtocolException if the input is malformed
+     * @throws TransmissionDisabledException if a user terminates the connection
      */
-    FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+    DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index d817425..51d3970 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -50,15 +50,17 @@ public interface ClientProtocol extends VersionedRemoteResource {
     
     
     
-    void startTransaction(Peer peer, TransferDirection direction) throws IOException;
+    void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException;
     
-    void completeTransaction();
+    void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException;
     
     void rollbackTransaction();
     
-    void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
+    // must be done within a transaction.
+    void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
     
-    DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException;
+    // must be done within a transaction.
+    DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException;
     
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 6b0c94b..58d26d4 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -36,6 +36,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
@@ -43,7 +44,6 @@ import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.client.Transaction;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -240,7 +240,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private SocketClientTransaction transaction;
     
     @Override
-    public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException {
+    public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException {
         if ( !handshakeComplete ) {
             throw new IllegalStateException("Handshake has not been performed");
         }
@@ -255,6 +255,21 @@ public class SocketClientProtocol implements ClientProtocol {
             // Indicate that we would like to have some data
             RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
             dos.flush();
+            
+            final Response dataAvailableCode = Response.read(transaction.getDataInputStream());
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", this, peer);
+                    transaction.setDataAvailable(true);
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    transaction.setDataAvailable(false);
+                    return;
+                default:
+                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+            }
+
         } else {
             // Indicate that we would like to have some data
             RequestType.SEND_FLOWFILES.writeRequestType(dos);
@@ -268,34 +283,180 @@ public class SocketClientProtocol implements ClientProtocol {
     		throw new IllegalStateException("Cannot receive data because no transaction has been started");
     	}
     	
+    	if ( transaction.getTransferDirection() == TransferDirection.SEND ) {
+    	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+    	}
+
+    	// if no data available, return null
+    	if ( !transaction.isDataAvailable() ) {
+    	    return null;
+    	}
+    	
     	final Peer peer = transaction.getPeer();
-        logger.debug("{} Receiving FlowFiles from {}", this, peer);
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
+        logger.debug("{} Receiving data from {}", this, peer);
+        final DataPacket packet = codec.decode(transaction.createCheckedInputStream());
+        
+        if ( packet != null ) {
+            transaction.incrementTransferCount();
+            
+            // Determine if Peer will send us data or has no data to send us
+            final DataInputStream dis = transaction.getDataInputStream();
+            final Response dataAvailableCode = Response.read(dis);
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", this, peer);
+                    transaction.setDataAvailable(true);
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    transaction.setDataAvailable(false);
+                    break;
+                default:
+                    throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+            }
         }
         
-        // Determine if Peer will send us data or has no data to send us
-        final Response dataAvailableCode = Response.read(dis);
-        switch (dataAvailableCode.getCode()) {
-            case MORE_DATA:
-                logger.debug("{} {} Indicates that data is available", this, peer);
-                break;
-            case NO_MORE_DATA:
-                logger.debug("{} No data available from {}", peer);
-                return null;
-            default:
-                throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+        return packet;
+    }
+
+    
+    @Override
+    public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException {
+        if ( transaction == null ) {
+            throw new IllegalStateException("Cannot send data because no transaction has been started");
+        }
+
+        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
+            throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+        }
+
+        final Peer peer = transaction.getPeer();
+        logger.debug("{} Sending data to {}", this, peer);
+
+        if ( transaction.getTransferCount() > 0 ) {
+            ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream());
         }
         
+        final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream();
+        codec.encode(dataPacket, checkedOutStream);
         
+        // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+        // Otherwise, do NOT close it because we don't want to close the underlying stream
+        // (CompressionOutputStream will not close the underlying stream when it's closed)
+        if ( useCompression ) {
+            checkedOutStream.close();
+        }
+        
+        transaction.incrementTransferCount();
     }
     
     
     @Override
+    public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException {
+        final SocketClientTransaction transaction = this.transaction;
+        this.transaction = null;
+        
+        if ( transaction == null ) {
+            throw new IllegalStateException("Cannot complete transaction because no transaction has been started");
+        }
+        
+        final Peer peer = transaction.getPeer();
+        
+        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
+            final boolean moreData = transaction.isDataAvailable();
+            if ( moreData ) {
+                throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+            }
+            
+            // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+            // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+            // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+            // session and then when we send the response back to the peer, the peer may have timed out and may not
+            // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+            // Critical Section involved in this transaction so that rather than the Critical Section being the
+            // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+            logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+            final String calculatedCRC = transaction.calculateCRC();
+            ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC);
+            
+            final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream());
+            logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+            
+            switch (confirmTransactionResponse.getCode()) {
+                case CONFIRM_TRANSACTION:
+                    break;
+                case BAD_CHECKSUM:
+                    throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+                default:
+                    throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+            }
+            
+            if ( applyBackPressure ) {
+                // Confirm that we received the data and the peer can now discard it but that the peer should not
+                // send any more data for a bit
+                logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+                ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream());
+            } else {
+                // Confirm that we received the data and the peer can now discard it
+                logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+                ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream());
+            }
+        } else {
+            logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+            ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream());
+            
+            final String calculatedCRC = transaction.calculateCRC();
+            
+            // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+            final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream());
+            if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+                // Confirm checksum and echo back the confirmation.
+                logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+                final String receivedCRC = transactionConfirmationResponse.getMessage();
+                
+                if ( versionNegotiator.getVersion() > 3 ) {
+                    if ( !receivedCRC.equals(calculatedCRC) ) {
+                        ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream());
+                        throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                    }
+                }
+                
+                ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), "");
+            } else {
+                throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+            }
+        
+            final Response transactionResponse;
+            try {
+                transactionResponse = Response.read(transaction.getDataInputStream());
+            } catch (final IOException e) {
+                throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+                        "It is unknown whether or not the peer successfully received/processed the data.", e);
+            }
+            
+            logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+            if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+            } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+                throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+            }
+        }
+    }
+    
+    
+    @Override
+    public void rollbackTransaction() {
+        final SocketClientTransaction transaction = this.transaction;
+        this.transaction = null;
+        
+        if ( transaction == null ) {
+            throw new IllegalStateException("Cannot rollback transaction because no transaction has been started");
+        }
+        
+        // TODO: IMPLEMENT
+    }
+    
+    @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
         if ( !handshakeComplete ) {
             throw new IllegalStateException("Handshake has not been performed");
@@ -344,7 +505,12 @@ public class SocketClientProtocol implements ClientProtocol {
             final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
             
             final long startNanos = System.nanoTime();
-            FlowFile flowFile = codec.decode(checkedIn, session);
+            
+            final DataPacket dataPacket = codec.decode(checkedIn);
+            FlowFile flowFile = session.create();
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+            
             final long transmissionNanos = System.nanoTime() - startNanos;
             final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
             
@@ -462,7 +628,33 @@ public class SocketClientProtocol implements ClientProtocol {
             final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
             
             final long startNanos = System.nanoTime();
-            flowFile = codec.encode(flowFile, session, checkedOutStream);
+            
+            // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+            final FlowFile toWrap = flowFile;
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    final DataPacket dataPacket = new DataPacket() {
+                        @Override
+                        public Map<String, String> getAttributes() {
+                            return toWrap.getAttributes();
+                        }
+
+                        @Override
+                        public InputStream getData() {
+                            return in;
+                        }
+
+                        @Override
+                        public long getSize() {
+                            return toWrap.getSize();
+                        }
+                    };
+                    
+                    codec.encode(dataPacket, checkedOutStream);
+                }
+            });
+            
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 0c4ce05..83522a5 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,48 +19,74 @@ package org.apache.nifi.remote.protocol.socket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
 
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.Transaction;
-import org.apache.nifi.remote.io.CompressionInputStream;
 
-public class SocketClientTransaction implements Transaction {
+public class SocketClientTransaction {
 	private final long startTime = System.nanoTime();
-	private long bytesReceived = 0L;
-	private CRC32 crc = new CRC32();
+	private final CRC32 crc = new CRC32();
 	
 	private final Peer peer;
-	private final TransferDirection direction;
 	
 	private final DataInputStream dis;
 	private final DataOutputStream dos;
-	private final CheckedInputStream checkedInputStream;
+	private final TransferDirection direction;
+	
+	private boolean dataAvailable = false;
+	private int transfers = 0;
 	
 	SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
 		this.peer = peer;
 		this.direction = direction;
-		
 		this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
 		this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
-		
-		final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis;
-        checkedInputStream = new CheckedInputStream(dataInputStream, crc);
 	}
 	
-	CheckedInputStream getCheckedInputStream() {
-		return checkedInputStream;
+	int getTransferCount() {
+	    return transfers;
+	}
+	
+	void incrementTransferCount() {
+	    transfers++;
+	}
+	
+	void setDataAvailable(final boolean available) {
+	    this.dataAvailable = available;
+	}
+	
+	boolean isDataAvailable() {
+	    return dataAvailable;
+	}
+	
+	TransferDirection getTransferDirection() {
+	    return direction;
 	}
 	
 	DataOutputStream getDataOutputStream() {
 		return dos;
 	}
 	
+	DataInputStream getDataInputStream() {
+	    return dis;
+	}
+	
+	CheckedInputStream createCheckedInputStream() {
+	    return new CheckedInputStream(dis, crc);
+	}
+	
+	CheckedOutputStream createCheckedOutputStream() {
+	    return new CheckedOutputStream(dos, crc);
+	}
+	
 	Peer getPeer() {
 		return peer;
 	}
 	
+	String calculateCRC() {
+	    return String.valueOf(crc.getValue());
+	}
 }