You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:16 UTC
[26/49] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index 3f0ec4f..e7b6d06 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -19,27 +19,30 @@ package org.apache.nifi.remote.protocol;
import java.io.InputStream;
import java.util.Map;
-
/**
- * Represents a piece of data that is to be sent to or that was received from a NiFi instance.
+ * Represents a piece of data that is to be sent to or that was received from a
+ * NiFi instance.
*/
public interface DataPacket {
/**
* The key-value attributes that are to be associated with the data
- * @return
+ *
+ * @return all attributes
+ */
+ Map<String, String> getAttributes();
+
+ /**
+ * An InputStream from which the content can be read
+ *
+ * @return input stream to the data
*/
- Map<String, String> getAttributes();
-
- /**
- * An InputStream from which the content can be read
- * @return
- */
- InputStream getData();
+ InputStream getData();
- /**
- * The length of the InputStream.
- * @return
- */
- long getSize();
+ /**
+ * The length of the InputStream.
+ *
+ * @return length of the inputstream.
+ */
+ long getSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
index 41dc276..016690c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -16,46 +16,44 @@
*/
package org.apache.nifi.remote.protocol.socket;
-
/**
- * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ * Enumeration of Properties that can be used for the Site-to-Site Socket
+ * Protocol.
*/
public enum HandshakeProperty {
+
/**
- * Boolean value indicating whether or not the contents of a FlowFile should be
- * GZipped when transferred.
+ * Boolean value indicating whether or not the contents of a FlowFile should
+ * be GZipped when transferred.
*/
GZIP,
-
/**
* The unique identifier of the port to communicate with
*/
PORT_IDENTIFIER,
-
/**
- * Indicates the number of milliseconds after the request was made that the client
- * will wait for a response. If no response has been received by the time this value
- * expires, the server can move on without attempting to service the request because
- * the client will have already disconnected.
+ * Indicates the number of milliseconds after the request was made that the
+ * client will wait for a response. If no response has been received by the
+ * time this value expires, the server can move on without attempting to
+ * service the request because the client will have already disconnected.
*/
REQUEST_EXPIRATION_MILLIS,
-
/**
- * The preferred number of FlowFiles that the server should send to the client
- * when pulling data. This property was introduced in version 5 of the protocol.
+ * The preferred number of FlowFiles that the server should send to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol.
*/
BATCH_COUNT,
-
/**
- * The preferred number of bytes that the server should send to the client when
- * pulling data. This property was introduced in version 5 of the protocol.
+ * The preferred number of bytes that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the
+ * protocol.
*/
BATCH_SIZE,
-
/**
- * The preferred amount of time that the server should send data to the client
- * when pulling data. This property was introduced in version 5 of the protocol.
- * Value is in milliseconds.
+ * The preferred amount of time that the server should send data to the
+ * client when pulling data. This property was introduced in version 5 of
+ * the protocol. Value is in milliseconds.
*/
BATCH_DURATION;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
index eae1940..6ad2ba0 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -22,28 +22,29 @@ import java.io.IOException;
import org.apache.nifi.remote.exception.ProtocolException;
public class Response {
+
private final ResponseCode code;
private final String message;
-
+
private Response(final ResponseCode code, final String explanation) {
this.code = code;
this.message = explanation;
}
-
+
public ResponseCode getCode() {
return code;
}
-
+
public String getMessage() {
return message;
}
-
+
public static Response read(final DataInputStream in) throws IOException, ProtocolException {
final ResponseCode code = ResponseCode.readCode(in);
final String message = code.containsMessage() ? in.readUTF() : null;
return new Response(code, message);
}
-
+
@Override
public String toString() {
return code + ": " + message;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
index 8860e73..0e1359e 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -23,131 +23,126 @@ import java.io.InputStream;
import org.apache.nifi.remote.exception.ProtocolException;
-
public enum ResponseCode {
+
RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of
- // ResponseCode, so that we can indicate a 0 followed by some other bytes
-
+ // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
// handshaking properties
PROPERTIES_OK(1, "Properties OK", false),
UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
MISSING_PROPERTY(232, "Missing Property", true),
-
// transaction indicators
CONTINUE_TRANSACTION(10, "Continue Transaction", false),
FINISH_TRANSACTION(11, "Finish Transaction", false),
- CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
+ CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
TRANSACTION_FINISHED(13, "Transaction Finished", false),
TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
CANCEL_TRANSACTION(15, "Cancel Transaction", true),
BAD_CHECKSUM(19, "Bad Checksum", false),
-
// data availability indicators
MORE_DATA(20, "More Data Exists", false),
NO_MORE_DATA(21, "No More Data Exists", false),
-
// port state indicators
UNKNOWN_PORT(200, "Unknown Port", false),
PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
-
// authorization
UNAUTHORIZED(240, "User Not Authorized", true),
-
// error indicators
ABORT(250, "Abort", true),
UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
END_OF_STREAM(255, "End of Stream", false);
-
+
private static final ResponseCode[] codeArray = new ResponseCode[256];
-
+
static {
- for ( final ResponseCode responseCode : ResponseCode.values() ) {
+ for (final ResponseCode responseCode : ResponseCode.values()) {
codeArray[responseCode.getCode()] = responseCode;
}
}
-
+
private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
private final int code;
private final byte[] codeSequence;
private final String description;
private final boolean containsMessage;
-
+
private ResponseCode(final int code, final String description, final boolean containsMessage) {
- this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
+ this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
this.code = code;
this.description = description;
this.containsMessage = containsMessage;
}
-
+
public int getCode() {
return code;
}
-
+
public byte[] getCodeSequence() {
return codeSequence;
}
-
+
@Override
public String toString() {
return description;
}
-
+
public boolean containsMessage() {
return containsMessage;
}
-
+
public void writeResponse(final DataOutputStream out) throws IOException {
- if ( containsMessage() ) {
+ if (containsMessage()) {
throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
}
-
+
out.write(getCodeSequence());
out.flush();
}
-
+
public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
- if ( !containsMessage() ) {
+ if (!containsMessage()) {
throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
}
-
+
out.write(getCodeSequence());
out.writeUTF(explanation);
out.flush();
}
-
+
static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
final int byte1 = in.read();
- if ( byte1 < 0 ) {
+ if (byte1 < 0) {
throw new EOFException();
- } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+ } else if (byte1 != CODE_SEQUENCE_VALUE_1) {
throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
}
-
+
final int byte2 = in.read();
- if ( byte2 < 0 ) {
+ if (byte2 < 0) {
throw new EOFException();
- } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+ } else if (byte2 != CODE_SEQUENCE_VALUE_2) {
throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
}
final int byte3 = in.read();
- if ( byte3 < 0 ) {
+ if (byte3 < 0) {
throw new EOFException();
}
-
+
final ResponseCode responseCode = codeArray[byte3];
if (responseCode == null) {
throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code");
}
return responseCode;
}
-
+
public static ResponseCode fromSequence(final byte[] value) {
final int code = value[3] & 0xFF;
final ResponseCode responseCode = codeArray[code];
return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 83c5305..de845ee 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -58,120 +58,121 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketClientProtocol implements ClientProtocol {
+
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
private RemoteDestination destination;
private boolean useCompression = false;
-
+
private String commsIdentifier;
private boolean handshakeComplete = false;
-
+
private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
-
+
private Response handshakeResponse = null;
private boolean readyForFileTransfer = false;
private String transitUriPrefix = null;
private int timeoutMillis = 30000;
-
+
private int batchCount;
private long batchSize;
private long batchMillis;
private EventReporter eventReporter;
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
-
+
public SocketClientProtocol() {
}
public void setPreferredBatchCount(final int count) {
this.batchCount = count;
}
-
+
public void setPreferredBatchSize(final long bytes) {
this.batchSize = bytes;
}
-
+
public void setPreferredBatchDuration(final long millis) {
this.batchMillis = millis;
}
-
+
public void setEventReporter(final EventReporter eventReporter) {
- this.eventReporter = eventReporter;
+ this.eventReporter = eventReporter;
}
-
+
public void setDestination(final RemoteDestination destination) {
this.destination = destination;
this.useCompression = destination.isUseCompression();
}
-
+
public void setTimeout(final int timeoutMillis) {
- this.timeoutMillis = timeoutMillis;
+ this.timeoutMillis = timeoutMillis;
}
-
+
@Override
public void handshake(final Peer peer) throws IOException, HandshakeException {
- handshake(peer, destination.getIdentifier());
+ handshake(peer, destination.getIdentifier());
}
-
+
public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
- if ( handshakeComplete ) {
+ if (handshakeComplete) {
throw new IllegalStateException("Handshake has already been completed");
}
commsIdentifier = UUID.randomUUID().toString();
logger.debug("{} handshaking with {}", this, peer);
-
+
final Map<HandshakeProperty, String> properties = new HashMap<>();
properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
-
- if ( destinationId != null ) {
- properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
+
+ if (destinationId != null) {
+ properties.put(HandshakeProperty.PORT_IDENTIFIER, destinationId);
}
-
- properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
-
- if ( versionNegotiator.getVersion() >= 5 ) {
- if ( batchCount > 0 ) {
+
+ properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis));
+
+ if (versionNegotiator.getVersion() >= 5) {
+ if (batchCount > 0) {
properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
}
- if ( batchSize > 0L ) {
+ if (batchSize > 0L) {
properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
}
- if ( batchMillis > 0L ) {
+ if (batchMillis > 0L) {
properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
}
}
-
+
final CommunicationsSession commsSession = peer.getCommunicationsSession();
commsSession.setTimeout(timeoutMillis);
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
+
dos.writeUTF(commsIdentifier);
-
- if ( versionNegotiator.getVersion() >= 3 ) {
+
+ if (versionNegotiator.getVersion() >= 3) {
dos.writeUTF(peer.getUrl());
transitUriPrefix = peer.getUrl();
-
- if ( !transitUriPrefix.endsWith("/") ) {
+
+ if (!transitUriPrefix.endsWith("/")) {
transitUriPrefix = transitUriPrefix + "/";
}
}
-
+
logger.debug("Handshaking with properties {}", properties);
dos.writeInt(properties.size());
- for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+ for (final Map.Entry<HandshakeProperty, String> entry : properties.entrySet()) {
dos.writeUTF(entry.getKey().name());
dos.writeUTF(entry.getValue());
}
-
+
dos.flush();
-
+
try {
handshakeResponse = Response.read(dis);
} catch (final ProtocolException e) {
throw new HandshakeException(e);
}
-
+
switch (handshakeResponse.getCode()) {
case PORT_NOT_IN_VALID_STATE:
case UNKNOWN_PORT:
@@ -181,71 +182,75 @@ public class SocketClientProtocol implements ClientProtocol {
readyForFileTransfer = true;
break;
default:
- logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+ logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[]{
this, handshakeResponse, peer});
peer.close();
throw new HandshakeException("Received unexpected response " + handshakeResponse);
}
-
+
logger.debug("{} Finished handshake with {}", this, peer);
handshakeComplete = true;
}
-
+
+ @Override
public boolean isReadyForFileTransfer() {
return readyForFileTransfer;
}
-
+
+ @Override
public boolean isPortInvalid() {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not completed successfully");
}
return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
}
-
+
+ @Override
public boolean isPortUnknown() {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not completed successfully");
}
return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
}
-
+
+ @Override
public boolean isDestinationFull() {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not completed successfully");
}
return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
}
-
+
@Override
public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not been performed");
}
-
+
logger.debug("{} Get Peer Statuses from {}", this, peer);
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
+
RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
dos.flush();
final int numPeers = dis.readInt();
final Set<PeerStatus> peers = new HashSet<>(numPeers);
- for (int i=0; i < numPeers; i++) {
+ for (int i = 0; i < numPeers; i++) {
final String hostname = dis.readUTF();
final int port = dis.readInt();
final boolean secure = dis.readBoolean();
final int flowFileCount = dis.readInt();
peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
}
-
+
logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
return peers;
}
-
+
@Override
public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not been performed");
}
@@ -255,177 +260,174 @@ public class SocketClientProtocol implements ClientProtocol {
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
-
+
FlowFileCodec codec = new StandardFlowFileCodec();
try {
codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
} catch (HandshakeException e) {
throw new ProtocolException(e.toString());
}
- logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+ logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[]{this, codec, commsSession});
return codec;
}
-
@Override
public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
- if ( !handshakeComplete ) {
+ if (!handshakeComplete) {
throw new IllegalStateException("Handshake has not been performed");
}
- if ( !readyForFileTransfer ) {
+ if (!readyForFileTransfer) {
throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
}
-
- return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
- direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
- }
+ return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
+ direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
+ }
@Override
public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- final String userDn = peer.getCommunicationsSession().getUserDn();
- final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
-
- final StopWatch stopWatch = new StopWatch(true);
- final Set<FlowFile> flowFilesReceived = new HashSet<>();
- long bytesReceived = 0L;
-
- while (true) {
- final long start = System.nanoTime();
- final DataPacket dataPacket = transaction.receive();
- if ( dataPacket == null ) {
- if ( flowFilesReceived.isEmpty() ) {
- peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
- }
- break;
- }
-
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
- flowFile = session.importFrom(dataPacket.getData(), flowFile);
- final long receiveNanos = System.nanoTime() - start;
-
- String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
- if ( sourceFlowFileIdentifier == null ) {
- sourceFlowFileIdentifier = "<Unknown Identifier>";
- }
-
- final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
- session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
-
- session.transfer(flowFile, Relationship.ANONYMOUS);
- bytesReceived += dataPacket.getSize();
- }
-
- // Confirm that what we received was the correct data.
- transaction.confirm();
-
- // Commit the session so that we have persisted the data
- session.commit();
-
- transaction.complete();
- logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-
- if ( !flowFilesReceived.isEmpty() ) {
- stopWatch.stop();
- final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
- final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
- final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
- }
-
- return flowFilesReceived.size();
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+
+ while (true) {
+ final long start = System.nanoTime();
+ final DataPacket dataPacket = transaction.receive();
+ if (dataPacket == null) {
+ if (flowFilesReceived.isEmpty()) {
+ peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ }
+ break;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ final long receiveNanos = System.nanoTime() - start;
+
+ String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+ if (sourceFlowFileIdentifier == null) {
+ sourceFlowFileIdentifier = "<Unknown Identifier>";
+ }
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host="
+ + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += dataPacket.getSize();
+ }
+
+ // Confirm that what we received was the correct data.
+ transaction.confirm();
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
+ transaction.complete();
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+
+ if (!flowFilesReceived.isEmpty()) {
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ }
+
+ return flowFilesReceived.size();
}
-
@Override
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return 0;
- }
-
- try {
- final String userDn = peer.getCommunicationsSession().getUserDn();
- final long startSendingNanos = System.nanoTime();
- final StopWatch stopWatch = new StopWatch(true);
- long bytesSent = 0L;
-
- final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
-
- final Set<FlowFile> flowFilesSent = new HashSet<>();
- boolean continueTransaction = true;
- while (continueTransaction) {
- final long startNanos = System.nanoTime();
- // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
- final FlowFile toWrap = flowFile;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
- transaction.send(dataPacket);
- }
- });
-
- final long transferNanos = System.nanoTime() - startNanos;
- final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
- flowFilesSent.add(flowFile);
- bytesSent += flowFile.getSize();
- logger.debug("{} Sent {} to {}", this, flowFile, peer);
-
- final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
- session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
- session.remove(flowFile);
-
- final long sendingNanos = System.nanoTime() - startSendingNanos;
- if ( sendingNanos < BATCH_SEND_NANOS ) {
- flowFile = session.get();
- } else {
- flowFile = null;
- }
-
- continueTransaction = (flowFile != null);
- }
-
- transaction.confirm();
-
- // consume input stream entirely, ignoring its contents. If we
- // don't do this, the Connection will not be returned to the pool
- stopWatch.stop();
- final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
- final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- final String dataSize = FormatUtils.formatDataSize(bytesSent);
-
- session.commit();
- transaction.complete();
-
- final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
- logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
-
- return flowFilesSent.size();
- } catch (final Exception e) {
- session.rollback();
- throw e;
- }
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return 0;
+ }
+
+ try {
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final long startSendingNanos = System.nanoTime();
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+ transaction.send(dataPacket);
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if (sendingNanos < BATCH_SEND_NANOS) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ }
+
+ transaction.confirm();
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+ transaction.complete();
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+
+ return flowFilesSent.size();
+ } catch (final Exception e) {
+ session.rollback();
+ throw e;
+ }
}
-
-
+
@Override
public VersionNegotiator getVersionNegotiator() {
return versionNegotiator;
}
-
+
@Override
public void shutdown(final Peer peer) throws IOException {
readyForFileTransfer = false;
final CommunicationsSession commsSession = peer.getCommunicationsSession();
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
+
logger.debug("{} Shutting down with {}", this, peer);
// Indicate that we would like to have some data
RequestType.SHUTDOWN.writeRequestType(dos);
@@ -436,7 +438,7 @@ public class SocketClientProtocol implements ClientProtocol {
public String getResourceName() {
return "SocketFlowFileProtocol";
}
-
+
@Override
public String toString() {
return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index e69104f..e83ea28 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -45,50 +45,51 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketClientTransaction implements Transaction {
- private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
-
- private final long creationNanoTime = System.nanoTime();
- private final CRC32 crc = new CRC32();
- private final int protocolVersion;
- private final FlowFileCodec codec;
- private final DataInputStream dis;
- private final DataOutputStream dos;
- private final TransferDirection direction;
- private final boolean compress;
- private final Peer peer;
- private final int penaltyMillis;
- private final String destinationId;
- private final EventReporter eventReporter;
-
- private boolean dataAvailable = false;
- private int transfers = 0;
- private long contentBytes = 0;
- private TransactionState state;
-
- SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
- final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
- this.protocolVersion = protocolVersion;
- this.destinationId = destinationId;
- this.peer = peer;
- this.codec = codec;
- this.direction = direction;
- this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
- this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
- this.compress = useCompression;
- this.state = TransactionState.TRANSACTION_STARTED;
- this.penaltyMillis = penaltyMillis;
- this.eventReporter = eventReporter;
-
- initialize();
- }
-
- private void initialize() throws IOException {
- try {
- if ( direction == TransferDirection.RECEIVE ) {
+
+ private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
+
+ private final long creationNanoTime = System.nanoTime();
+ private final CRC32 crc = new CRC32();
+ private final int protocolVersion;
+ private final FlowFileCodec codec;
+ private final DataInputStream dis;
+ private final DataOutputStream dos;
+ private final TransferDirection direction;
+ private final boolean compress;
+ private final Peer peer;
+ private final int penaltyMillis;
+ private final String destinationId;
+ private final EventReporter eventReporter;
+
+ private boolean dataAvailable = false;
+ private int transfers = 0;
+ private long contentBytes = 0;
+ private TransactionState state;
+
+ SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
+ final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
+ this.protocolVersion = protocolVersion;
+ this.destinationId = destinationId;
+ this.peer = peer;
+ this.codec = codec;
+ this.direction = direction;
+ this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+ this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+ this.compress = useCompression;
+ this.state = TransactionState.TRANSACTION_STARTED;
+ this.penaltyMillis = penaltyMillis;
+ this.eventReporter = eventReporter;
+
+ initialize();
+ }
+
+ private void initialize() throws IOException {
+ try {
+ if (direction == TransferDirection.RECEIVE) {
// Indicate that we would like to have some data
RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
dos.flush();
-
+
final Response dataAvailableCode = Response.read(dis);
switch (dataAvailableCode.getCode()) {
case MORE_DATA:
@@ -102,39 +103,38 @@ public class SocketClientTransaction implements Transaction {
default:
throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
}
-
+
} else {
// Indicate that we would like to have some data
RequestType.SEND_FLOWFILES.writeRequestType(dos);
dos.flush();
}
- } catch (final Exception e) {
- error();
- throw e;
- }
- }
-
-
- @Override
- public DataPacket receive() throws IOException {
- try {
- try {
- if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
- throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
- }
-
- if ( direction == TransferDirection.SEND ) {
- throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
- }
-
- // if we already know there's no data, just return null
- if ( !dataAvailable ) {
- return null;
- }
-
- // if we have already received a packet, check if another is available.
- if ( transfers > 0 ) {
- // Determine if Peer will send us data or has no data to send us
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public DataPacket receive() throws IOException {
+ try {
+ try {
+ if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
+ }
+
+ if (direction == TransferDirection.SEND) {
+ throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
+ }
+
+ // if we already know there's no data, just return null
+ if (!dataAvailable) {
+ return null;
+ }
+
+ // if we have already received a packet, check if another is available.
+ if (transfers > 0) {
+ // Determine if Peer will send us data or has no data to send us
final Response dataAvailableCode = Response.read(dis);
switch (dataAvailableCode.getCode()) {
case CONTINUE_TRANSACTION:
@@ -149,170 +149,166 @@ public class SocketClientTransaction implements Transaction {
throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode);
}
}
-
- // if no data available, return null
- if ( !dataAvailable ) {
- return null;
- }
-
+
+ // if no data available, return null
+ if (!dataAvailable) {
+ return null;
+ }
+
logger.debug("{} Receiving data from {}", this, peer);
final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
-
- if ( packet == null ) {
+
+ if (packet == null) {
this.dataAvailable = false;
} else {
- transfers++;
- contentBytes += packet.getSize();
+ transfers++;
+ contentBytes += packet.getSize();
}
-
+
this.state = TransactionState.DATA_EXCHANGED;
return packet;
- } catch (final IOException ioe) {
- throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
- }
- } catch (final Exception e) {
- error();
- throw e;
- }
- }
-
-
- @Override
- public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
- send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
- }
-
- @Override
- public void send(final DataPacket dataPacket) throws IOException {
- try {
- try {
- if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
- throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
- }
-
- if ( direction == TransferDirection.RECEIVE ) {
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+ send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+ }
+
+ @Override
+ public void send(final DataPacket dataPacket) throws IOException {
+ try {
+ try {
+ if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
+ }
+
+ if (direction == TransferDirection.RECEIVE) {
throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction");
}
-
- if ( transfers > 0 ) {
+
+ if (transfers > 0) {
ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
}
-
+
logger.debug("{} Sending data to {}", this, peer);
-
+
final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
- final OutputStream out = new CheckedOutputStream(dataOut, crc);
+ final OutputStream out = new CheckedOutputStream(dataOut, crc);
codec.encode(dataPacket, out);
-
+
// need to close the CompressionOutputStream in order to force it write out any remaining bytes.
// Otherwise, do NOT close it because we don't want to close the underlying stream
// (CompressionOutputStream will not close the underlying stream when it's closed)
- if ( compress ) {
- out.close();
+ if (compress) {
+ out.close();
}
-
+
transfers++;
contentBytes += dataPacket.getSize();
this.state = TransactionState.DATA_EXCHANGED;
- } catch (final IOException ioe) {
- throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
- }
- } catch (final Exception e) {
- error();
- throw e;
- }
- }
-
-
- @Override
- public void cancel(final String explanation) throws IOException {
- if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
- throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
- }
-
- try {
- ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
- state = TransactionState.TRANSACTION_CANCELED;
- } catch (final IOException ioe) {
- error();
- throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
- }
- }
-
-
- @Override
- public TransactionCompletion complete() throws IOException {
- try {
- try {
- if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
- throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state +
- "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
- }
-
- boolean backoff = false;
- if ( direction == TransferDirection.RECEIVE ) {
- if ( transfers == 0 ) {
- state = TransactionState.TRANSACTION_COMPLETED;
- return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
- }
-
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public void cancel(final String explanation) throws IOException {
+ if (state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) {
+ throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+ }
+
+ try {
+ ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+ state = TransactionState.TRANSACTION_CANCELED;
+ } catch (final IOException ioe) {
+ error();
+ throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
+ }
+ }
+
+ @Override
+ public TransactionCompletion complete() throws IOException {
+ try {
+ try {
+ if (state != TransactionState.TRANSACTION_CONFIRMED) {
+ throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state
+ + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+ }
+
+ boolean backoff = false;
+ if (direction == TransferDirection.RECEIVE) {
+ if (transfers == 0) {
+ state = TransactionState.TRANSACTION_COMPLETED;
+ return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
+ }
+
// Confirm that we received the data and the peer can now discard it
logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-
+
state = TransactionState.TRANSACTION_COMPLETED;
} else {
final Response transactionResponse;
try {
transactionResponse = Response.read(dis);
} catch (final IOException e) {
- throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
- "It is unknown whether or not the peer successfully received/processed the data.", e);
+ throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. "
+ + "It is unknown whether or not the peer successfully received/processed the data.", e);
}
-
+
logger.debug("{} Received {} from {}", this, transactionResponse, peer);
- if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
peer.penalize(destinationId, penaltyMillis);
backoff = true;
- } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse);
}
-
+
state = TransactionState.TRANSACTION_COMPLETED;
}
-
- return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
- } catch (final IOException ioe) {
- throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
- }
- } catch (final Exception e) {
- error();
- throw e;
- }
- }
-
-
- @Override
- public void confirm() throws IOException {
- try {
- try {
- if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
- // client requested to receive data but no data available. no need to confirm.
- state = TransactionState.TRANSACTION_CONFIRMED;
- return;
- }
-
- if ( state != TransactionState.DATA_EXCHANGED ) {
- throw new IllegalStateException("Cannot confirm Transaction because state is " + state +
- "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
- }
-
- if ( direction == TransferDirection.RECEIVE ) {
- if ( dataAvailable ) {
+
+ return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public void confirm() throws IOException {
+ try {
+ try {
+ if (state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE) {
+ // client requested to receive data but no data available. no need to confirm.
+ state = TransactionState.TRANSACTION_CONFIRMED;
+ return;
+ }
+
+ if (state != TransactionState.DATA_EXCHANGED) {
+ throw new IllegalStateException("Cannot confirm Transaction because state is " + state
+ + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED);
+ }
+
+ if (direction == TransferDirection.RECEIVE) {
+ if (dataAvailable) {
throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
}
-
+
// we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
// to peer so that we can verify that the connection is still open. This is a two-phase commit,
// which helps to prevent the chances of data duplication. Without doing this, we may commit the
@@ -323,84 +319,88 @@ public class SocketClientTransaction implements Transaction {
logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
final String calculatedCRC = String.valueOf(crc.getValue());
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-
+
final Response confirmTransactionResponse;
try {
confirmTransactionResponse = Response.read(dis);
} catch (final IOException ioe) {
logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer);
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
}
throw ioe;
}
-
+
logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-
+
switch (confirmTransactionResponse.getCode()) {
case CONFIRM_TRANSACTION:
break;
case BAD_CHECKSUM:
throw new IOException(this + " Received a BadChecksum response from peer " + peer);
default:
- throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : "
+ + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
}
-
+
state = TransactionState.TRANSACTION_CONFIRMED;
} else {
logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-
+
final String calculatedCRC = String.valueOf(crc.getValue());
-
+
// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
final Response transactionConfirmationResponse = Response.read(dis);
- if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
// Confirm checksum and echo back the confirmation.
logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
final String receivedCRC = transactionConfirmationResponse.getMessage();
-
+
// CRC was not used before version 4
- if ( protocolVersion > 3 ) {
- if ( !receivedCRC.equals(calculatedCRC) ) {
+ if (protocolVersion > 3) {
+ if (!receivedCRC.equals(calculatedCRC)) {
ResponseCode.BAD_CHECKSUM.writeResponse(dos);
- throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as "
+ + calculatedCRC + " while peer calculated CRC32 Checksum as "
+ + receivedCRC + "; canceling transaction and rolling back session");
}
}
-
+
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
} else {
- throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer "
+ + peer + " but received " + transactionConfirmationResponse);
}
-
+
state = TransactionState.TRANSACTION_CONFIRMED;
}
- } catch (final IOException ioe) {
- throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
- }
- } catch (final Exception e) {
- error();
- throw e;
- }
- }
-
- @Override
- public void error() {
- this.state = TransactionState.ERROR;
- }
-
- @Override
- public TransactionState getState() {
- return state;
- }
-
- @Override
- public Communicant getCommunicant() {
- return peer;
- }
-
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public void error() {
+ this.state = TransactionState.ERROR;
+ }
+
+ @Override
+ public TransactionState getState() {
+ return state;
+ }
+
+ @Override
+ public Communicant getCommunicant() {
+ return peer;
+ }
+
@Override
public String toString() {
- return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]";
+ return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
index 5eb6c91..bd95013 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
@@ -26,7 +26,7 @@ public class SocketClientTransactionCompletion implements TransactionCompletion
private final int dataPacketsTransferred;
private final long bytesTransferred;
private final long durationNanos;
-
+
public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
this.backoff = backoff;
this.dataPacketsTransferred = dataPacketsTransferred;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
index 10352ec..d746abf 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
@@ -32,43 +32,44 @@ import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
public class NiFiRestApiUtil {
+
public static final int RESPONSE_CODE_OK = 200;
-
+
private final SSLContext sslContext;
-
+
public NiFiRestApiUtil(final SSLContext sslContext) {
this.sslContext = sslContext;
}
-
+
private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException {
final URL url = new URL(connUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(timeoutMillis);
connection.setReadTimeout(timeoutMillis);
-
+
// special handling for https
if (sslContext != null && connection instanceof HttpsURLConnection) {
HttpsURLConnection secureConnection = (HttpsURLConnection) connection;
secureConnection.setSSLSocketFactory(sslContext.getSocketFactory());
// check the trusted hostname property and override the HostnameVerifier
- secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(),
+ secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(),
secureConnection.getHostnameVerifier()));
}
-
+
return connection;
}
-
+
public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException {
final HttpURLConnection connection = getConnection(url, timeoutMillis);
connection.setRequestMethod("GET");
final int responseCode = connection.getResponseCode();
-
+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
StreamUtils.copy(connection.getInputStream(), baos);
final String responseMessage = baos.toString();
-
- if ( responseCode == RESPONSE_CODE_OK ) {
+
+ if (responseCode == RESPONSE_CODE_OK) {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode jsonNode = mapper.readTree(responseMessage);
final JsonNode controllerNode = jsonNode.get("controller");
@@ -77,8 +78,9 @@ public class NiFiRestApiUtil {
throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage);
}
}
-
+
private static class OverrideHostnameVerifier implements HostnameVerifier {
+
private final String trustedHostname;
private final HostnameVerifier delegate;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index 6dab77b..c52b4b7 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -21,7 +21,8 @@ import java.util.Set;
import org.apache.nifi.remote.PeerStatus;
public class PeerStatusCache {
- private final Set<PeerStatus> statuses;
+
+ private final Set<PeerStatus> statuses;
private final long timestamp;
public PeerStatusCache(final Set<PeerStatus> statuses) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
index bd1b50c..70bb324 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -25,26 +25,26 @@ import org.apache.nifi.stream.io.MinimumLengthInputStream;
public class StandardDataPacket implements DataPacket {
- private final Map<String, String> attributes;
- private final InputStream stream;
- private final long size;
-
- public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
- this.attributes = attributes;
- this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
- this.size = size;
- }
-
- public Map<String, String> getAttributes() {
- return attributes;
- }
-
- public InputStream getData() {
- return stream;
- }
-
- public long getSize() {
- return size;
- }
-
+ private final Map<String, String> attributes;
+ private final InputStream stream;
+ private final long size;
+
+ public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+ this.attributes = attributes;
+ this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+ this.size = size;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public InputStream getData() {
+ return stream;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index c5cca78..8336745 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -40,11 +40,11 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
- for ( final PeerStatus peerStatus : destinations ) {
+ for (final PeerStatus peerStatus : destinations) {
System.out.println(peerStatus.getPeerDescription());
}
}
-
+
@Test
public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -54,14 +54,11 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.RECEIVE);
- for ( final PeerStatus peerStatus : destinations ) {
+ for (final PeerStatus peerStatus : destinations) {
System.out.println(peerStatus.getPeerDescription());
}
}
-
-
-
-
+
@Test
public void testFormulateDestinationListForInputPorts() throws IOException {
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -74,11 +71,11 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
- for ( final PeerStatus peerStatus : destinations ) {
+ for (final PeerStatus peerStatus : destinations) {
System.out.println(peerStatus.getPeerDescription());
}
}
-
+
@Test
public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@@ -88,7 +85,7 @@ public class TestEndpointConnectionStatePool {
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
- for ( final PeerStatus peerStatus : destinations ) {
+ for (final PeerStatus peerStatus : destinations) {
System.out.println(peerStatus.getPeerDescription());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index b73e44d..155fc95 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -39,32 +38,32 @@ public class TestSiteToSiteClient {
@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-
+
final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("cba")
- .requestBatchCount(10)
- .build();
-
+ .url("http://localhost:8080/nifi")
+ .portName("cba")
+ .requestBatchCount(10)
+ .build();
+
try {
- for (int i=0; i < 1000; i++) {
+ for (int i = 0; i < 1000; i++) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
Assert.assertNotNull(transaction);
-
+
DataPacket packet;
while (true) {
packet = transaction.receive();
- if ( packet == null ) {
+ if (packet == null) {
break;
}
final InputStream in = packet.getData();
final long size = packet.getSize();
final byte[] buff = new byte[(int) size];
-
+
StreamUtils.fillBuffer(in, buff);
}
-
+
transaction.confirm();
transaction.complete();
}
@@ -72,34 +71,33 @@ public class TestSiteToSiteClient {
client.close();
}
}
-
-
+
@Test
@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-
+
final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("input")
- .build();
-
+ .url("http://localhost:8080/nifi")
+ .portName("input")
+ .build();
+
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
Assert.assertNotNull(transaction);
-
+
final Map<String, String> attrs = new HashMap<>();
attrs.put("site-to-site", "yes, please!");
final byte[] bytes = "Hello".getBytes();
final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
transaction.send(packet);
-
+
transaction.confirm();
transaction.complete();
} finally {
client.close();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
index 172c593..cc24575 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
@@ -33,10 +33,6 @@ import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- * @author none
- */
public abstract class AbstractChannelReader implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
@@ -91,12 +87,12 @@ public abstract class AbstractChannelReader implements Runnable {
* Allows a subclass to specifically handle how it reads from the given
* key's channel into the given buffer.
*
- * @param key
- * @param buffer
+ * @param key of channel to read from
+ * @param buffer to fill
* @return the number of bytes read in the final read cycle. A value of zero
* or more indicates the channel is still open but a value of -1 indicates
* end of stream.
- * @throws IOException
+ * @throws IOException if reading from channel causes failure
*/
protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
index a413ad2..007034b 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
@@ -25,10 +25,6 @@ import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- * @author none
- */
public class BufferPool implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
@@ -50,9 +46,9 @@ public class BufferPool implements Runnable {
/**
* Returns the given buffer to the pool - and clears it.
*
- * @param buffer
- * @param bytesProcessed
- * @return
+ * @param buffer buffer to return
+ * @param bytesProcessed bytes processed for this buffer being returned
+ * @return true if buffer returned to pool
*/
public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
totalBytesExtracted += bytesProcessed;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
index 2ae2c07..824f2df 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -35,10 +35,6 @@ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- * @author none
- */
public final class ChannelDispatcher implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
@@ -81,8 +77,8 @@ public final class ChannelDispatcher implements Runnable {
/*
* When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
* channels' keys.
- *
- * @throws IOException
+ *
+ * @throws IOException if unable to select keys
*/
private void selectServerSocketKeys() throws IOException {
int numSelected = serverSocketSelector.select(timeout);
@@ -121,8 +117,8 @@ public final class ChannelDispatcher implements Runnable {
* When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
* selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
* the selected key set is not manually changed via a remove operation.
- *
- * @throws IOException
+ *
+ * @throws IOException if unable to select keys
*/
private void selectSocketChannelKeys() throws IOException {
// once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
@@ -138,7 +134,7 @@ public final class ChannelDispatcher implements Runnable {
// there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
// threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
// for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
- // way to tell if it's new is the lack of an attachment.
+ // way to tell if it's new is the lack of an attachment.
if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
socketChannelKey.attach(reader);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
index b0a1cfb..7cbf589 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
* All ChannelReaders will get throttled by the unavailability of buffers in the
* provided BufferPool. This is designed to create back pressure.
*
- * @author none
*/
public final class ChannelListener {
@@ -99,7 +98,7 @@ public final class ChannelListener {
* @param port - port to bind to
* @param receiveBufferSize - size of OS receive buffer to request. If less
* than 0 then will not be set and OS default will win.
- * @throws IOException
+ * @throws IOException if unable to add socket
*/
public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
throws IOException {
@@ -129,7 +128,7 @@ public final class ChannelListener {
* @param port - the port to listen on
* @param receiveBufferSize - the number of bytes to request for a receive
* buffer from OS
- * @throws IOException
+ * @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
throws IOException {
@@ -156,7 +155,7 @@ public final class ChannelListener {
* any network interface on the local host.
* @param sendingPort - the port used by the sender of datagrams. Only
* datagrams from this port will be received.
- * @throws IOException
+ * @throws IOException if unable to add channel
*/
public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
final Integer sendingPort) throws IOException {