You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/21 13:35:24 UTC
incubator-nifi git commit: NIFI-4: Refactoring protocols to be used
with DataPacket instead of FlowFile as a more generic data model
Repository: incubator-nifi
Updated Branches:
refs/heads/site-to-site-client c174d3a60 -> 2aaed7021
NIFI-4: Refactoring protocols to be used with DataPacket instead of FlowFile as a more generic data model
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2aaed702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2aaed702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2aaed702
Branch: refs/heads/site-to-site-client
Commit: 2aaed7021d0ed0b6cc81415edbef19c95d8ea68c
Parents: c174d3a
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 21 07:35:09 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 21 07:35:09 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/remote/client/Transaction.java | 21 --
.../apache/nifi/remote/codec/FlowFileCodec.java | 32 +--
.../nifi/remote/protocol/ClientProtocol.java | 10 +-
.../protocol/socket/SocketClientProtocol.java | 236 +++++++++++++++++--
.../socket/SocketClientTransaction.java | 54 +++--
5 files changed, 272 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
deleted file mode 100644
index bae6e51..0000000
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-public interface Transaction {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
index b4206b3..1380e1b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -21,11 +21,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.protocol.DataPacket;
/**
* <p>
@@ -44,36 +43,29 @@ public interface FlowFileCodec extends VersionedRemoteResource {
public List<Integer> getSupportedVersions();
/**
- * Encodes a FlowFile and its content as a single stream of data and writes
- * that stream to the output. If checksum is not null, it will be calculated
- * as the stream is read
+ * Encodes a DataPacket and its content as a single stream of data and writes
+ * that stream to the output.
*
- * @param flowFile the FlowFile to encode
- * @param session a session that can be used to transactionally create and
- * transfer flow files
+ * @param dataPacket the data to serialize
* @param outStream the stream to write the data to
*
- * @return the updated FlowFile
- *
- * @throws IOException
+ * @throws IOException if there is a communications issue
+ * @throws TransmissionDisabledException if a user terminates the connection
*/
- FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
+ void encode(DataPacket dataPacket, OutputStream outStream) throws IOException, TransmissionDisabledException;
/**
* Decodes the contents of the InputStream, interpreting the data to
- * determine the next FlowFile's attributes and content, as well as their
- * destinations. If not null, checksum will be used to calculate the
- * checksum as the data is read.
+ * determine the next DataPacket's attributes and content.
*
- * @param stream an InputStream containing FlowFiles' contents, attributes,
- * and destinations
- * @param session
+ * @param stream an InputStream containing DataPacket's content and attributes
*
- * @return the FlowFile that was created, or <code>null</code> if the stream
+ * @return the DataPacket that was created, or <code>null</code> if the stream
* was out of data
*
* @throws IOException
* @throws ProtocolException if the input is malformed
+ * @throws TransmissionDisabledException if a user terminates the connection
*/
- FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+ DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index d817425..51d3970 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -50,15 +50,17 @@ public interface ClientProtocol extends VersionedRemoteResource {
- void startTransaction(Peer peer, TransferDirection direction) throws IOException;
+ void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException;
- void completeTransaction();
+ void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException;
void rollbackTransaction();
- void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
+ // must be done within a transaction.
+ void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
- DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException;
+ // must be done within a transaction.
+ DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 6b0c94b..58d26d4 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -36,6 +36,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
@@ -43,7 +44,6 @@ import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.client.Transaction;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
@@ -240,7 +240,7 @@ public class SocketClientProtocol implements ClientProtocol {
private SocketClientTransaction transaction;
@Override
- public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException {
+ public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException {
if ( !handshakeComplete ) {
throw new IllegalStateException("Handshake has not been performed");
}
@@ -255,6 +255,21 @@ public class SocketClientProtocol implements ClientProtocol {
// Indicate that we would like to have some data
RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
dos.flush();
+
+ final Response dataAvailableCode = Response.read(transaction.getDataInputStream());
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ transaction.setDataAvailable(true);
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ transaction.setDataAvailable(false);
+ return;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
} else {
// Indicate that we would like to have some data
RequestType.SEND_FLOWFILES.writeRequestType(dos);
@@ -268,34 +283,180 @@ public class SocketClientProtocol implements ClientProtocol {
throw new IllegalStateException("Cannot receive data because no transaction has been started");
}
+ if ( transaction.getTransferDirection() == TransferDirection.SEND ) {
+ throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+ }
+
+ // if no data available, return null
+ if ( !transaction.isDataAvailable() ) {
+ return null;
+ }
+
final Peer peer = transaction.getPeer();
- logger.debug("{} Receiving FlowFiles from {}", this, peer);
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
- String userDn = commsSession.getUserDn();
- if ( userDn == null ) {
- userDn = "none";
+ logger.debug("{} Receiving data from {}", this, peer);
+ final DataPacket packet = codec.decode(transaction.createCheckedInputStream());
+
+ if ( packet != null ) {
+ transaction.incrementTransferCount();
+
+ // Determine if Peer will send us data or has no data to send us
+ final DataInputStream dis = transaction.getDataInputStream();
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ transaction.setDataAvailable(true);
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ transaction.setDataAvailable(false);
+ break;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
}
- // Determine if Peer will send us data or has no data to send us
- final Response dataAvailableCode = Response.read(dis);
- switch (dataAvailableCode.getCode()) {
- case MORE_DATA:
- logger.debug("{} {} Indicates that data is available", this, peer);
- break;
- case NO_MORE_DATA:
- logger.debug("{} No data available from {}", peer);
- return null;
- default:
- throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ return packet;
+ }
+
+
+ @Override
+ public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( transaction == null ) {
+ throw new IllegalStateException("Cannot send data because no transaction has been started");
+ }
+
+ if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
+ throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+ }
+
+ final Peer peer = transaction.getPeer();
+ logger.debug("{} Sending data to {}", this, peer);
+
+ if ( transaction.getTransferCount() > 0 ) {
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream());
}
+ final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream();
+ codec.encode(dataPacket, checkedOutStream);
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( useCompression ) {
+ checkedOutStream.close();
+ }
+
+ transaction.incrementTransferCount();
}
@Override
+ public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException {
+ final SocketClientTransaction transaction = this.transaction;
+ this.transaction = null;
+
+ if ( transaction == null ) {
+ throw new IllegalStateException("Cannot complete transaction because no transaction has been started");
+ }
+
+ final Peer peer = transaction.getPeer();
+
+ if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
+ final boolean moreData = transaction.isDataAvailable();
+ if ( moreData ) {
+ throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ final String calculatedCRC = transaction.calculateCRC();
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC);
+
+ final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream());
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ if ( applyBackPressure ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream());
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream());
+ }
+ } else {
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream());
+
+ final String calculatedCRC = transaction.calculateCRC();
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream());
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ if ( versionNegotiator.getVersion() > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream());
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(transaction.getDataInputStream());
+ } catch (final IOException e) {
+ throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+ "It is unknown whether or not the peer successfully received/processed the data.", e);
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+ }
+ }
+
+
+ @Override
+ public void rollbackTransaction() {
+ final SocketClientTransaction transaction = this.transaction;
+ this.transaction = null;
+
+ if ( transaction == null ) {
+ throw new IllegalStateException("Cannot rollback transaction because no transaction has been started");
+ }
+
+ // TODO: IMPLEMENT
+ }
+
+ @Override
public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
if ( !handshakeComplete ) {
throw new IllegalStateException("Handshake has not been performed");
@@ -344,7 +505,12 @@ public class SocketClientProtocol implements ClientProtocol {
final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
final long startNanos = System.nanoTime();
- FlowFile flowFile = codec.decode(checkedIn, session);
+
+ final DataPacket dataPacket = codec.decode(checkedIn);
+ FlowFile flowFile = session.create();
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+
final long transmissionNanos = System.nanoTime() - startNanos;
final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
@@ -462,7 +628,33 @@ public class SocketClientProtocol implements ClientProtocol {
final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
final long startNanos = System.nanoTime();
- flowFile = codec.encode(flowFile, session, checkedOutStream);
+
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new DataPacket() {
+ @Override
+ public Map<String, String> getAttributes() {
+ return toWrap.getAttributes();
+ }
+
+ @Override
+ public InputStream getData() {
+ return in;
+ }
+
+ @Override
+ public long getSize() {
+ return toWrap.getSize();
+ }
+ };
+
+ codec.encode(dataPacket, checkedOutStream);
+ }
+ });
+
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2aaed702/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 0c4ce05..83522a5 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,48 +19,74 @@ package org.apache.nifi.remote.protocol.socket;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.Transaction;
-import org.apache.nifi.remote.io.CompressionInputStream;
-public class SocketClientTransaction implements Transaction {
+public class SocketClientTransaction {
private final long startTime = System.nanoTime();
- private long bytesReceived = 0L;
- private CRC32 crc = new CRC32();
+ private final CRC32 crc = new CRC32();
private final Peer peer;
- private final TransferDirection direction;
private final DataInputStream dis;
private final DataOutputStream dos;
- private final CheckedInputStream checkedInputStream;
+ private final TransferDirection direction;
+
+ private boolean dataAvailable = false;
+ private int transfers = 0;
SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
this.peer = peer;
this.direction = direction;
-
this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
-
- final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis;
- checkedInputStream = new CheckedInputStream(dataInputStream, crc);
}
- CheckedInputStream getCheckedInputStream() {
- return checkedInputStream;
+ int getTransferCount() {
+ return transfers;
+ }
+
+ void incrementTransferCount() {
+ transfers++;
+ }
+
+ void setDataAvailable(final boolean available) {
+ this.dataAvailable = available;
+ }
+
+ boolean isDataAvailable() {
+ return dataAvailable;
+ }
+
+ TransferDirection getTransferDirection() {
+ return direction;
}
DataOutputStream getDataOutputStream() {
return dos;
}
+ DataInputStream getDataInputStream() {
+ return dis;
+ }
+
+ CheckedInputStream createCheckedInputStream() {
+ return new CheckedInputStream(dis, crc);
+ }
+
+ CheckedOutputStream createCheckedOutputStream() {
+ return new CheckedOutputStream(dos, crc);
+ }
+
Peer getPeer() {
return peer;
}
+ String calculateCRC() {
+ return String.valueOf(crc.getValue());
+ }
}