You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:56:57 UTC
[04/29] incubator-nifi git commit: NIFI-282: Bug fixes;
documentation improvements; removed code marked as 'FOR TESTING'
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
new file mode 100644
index 0000000..dc3d68f
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+public class SSLSocketChannelOutput implements CommunicationsOutput {
+ private final OutputStream out;
+ private final ByteCountingOutputStream countingOut;
+
+ public SSLSocketChannelOutput(final SSLSocketChannel channel) {
+ countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
+ out = new BufferedOutputStream(countingOut);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return countingOut.getBytesWritten();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..befbdaa
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+ void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
+
+ Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
+
+ FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+ void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void shutdown(Peer peer) throws IOException, ProtocolException;
+
+ boolean isReadyForFileTransfer();
+
+
+
+
+ Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException;
+
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * invalid
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortInvalid() throws IllegalStateException;
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * unknown
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortUnknown();
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port's
+ * destination is full
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isDestinationFull();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+ InputStream getInputStream() throws IOException;
+
+ long getBytesRead();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+ OutputStream getOutputStream() throws IOException;
+
+ long getBytesWritten();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+ public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+
+ CommunicationsInput getInput();
+
+ CommunicationsOutput getOutput();
+
+ void setTimeout(int millis) throws IOException;
+
+ int getTimeout() throws IOException;
+
+ void setUri(String uri);
+
+ String getUri();
+
+ String getUserDn();
+
+ void setUserDn(String dn);
+
+ boolean isDataAvailable();
+
+ long getBytesWritten();
+
+ long getBytesRead();
+
+ /**
+ * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
+ * that they stop sending and receiving data as soon as possible after this
+ * method has been called, even if doing so results in sending only partial
+ * data to the peer. This will usually result in the peer throwing a
+ * SocketTimeoutException.
+ */
+ void interrupt();
+
+ /**
+ * Returns <code>true</code> if the connection is closed, <code>false</code>
+ * otherwise.
+ *
+ * @return
+ */
+ boolean isClosed();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+ Map<String, String> getAttributes();
+
+ InputStream getData();
+
+ long getSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+ NEGOTIATE_FLOWFILE_CODEC,
+ REQUEST_PEER_LIST,
+ SEND_FLOWFILES,
+ RECEIVE_FLOWFILES,
+ SHUTDOWN;
+
+ public void writeRequestType(final DataOutputStream dos) throws IOException {
+ dos.writeUTF(name());
+ }
+
+ public static RequestType readRequestType(final DataInputStream dis) throws IOException {
+ final String requestTypeVal = dis.readUTF();
+ try {
+ return RequestType.valueOf(requestTypeVal);
+ } catch (final Exception e) {
+ throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
new file mode 100644
index 0000000..41dc276
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ */
+public enum HandshakeProperty {
+ /**
+ * Boolean value indicating whether or not the contents of a FlowFile should be
+ * GZipped when transferred.
+ */
+ GZIP,
+
+ /**
+ * The unique identifier of the port to communicate with
+ */
+ PORT_IDENTIFIER,
+
+ /**
+ * Indicates the number of milliseconds after the request was made that the client
+ * will wait for a response. If no response has been received by the time this value
+ * expires, the server can move on without attempting to service the request because
+ * the client will have already disconnected.
+ */
+ REQUEST_EXPIRATION_MILLIS,
+
+ /**
+ * The preferred number of FlowFiles that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the protocol.
+ */
+ BATCH_COUNT,
+
+ /**
+ * The preferred number of bytes that the server should send to the client when
+ * pulling data. This property was introduced in version 5 of the protocol.
+ */
+ BATCH_SIZE,
+
+ /**
+ * The preferred amount of time that the server should send data to the client
+ * when pulling data. This property was introduced in version 5 of the protocol.
+ * Value is in milliseconds.
+ */
+ BATCH_DURATION;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
new file mode 100644
index 0000000..eae1940
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class Response {
+ private final ResponseCode code;
+ private final String message;
+
+ private Response(final ResponseCode code, final String explanation) {
+ this.code = code;
+ this.message = explanation;
+ }
+
+ public ResponseCode getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public static Response read(final DataInputStream in) throws IOException, ProtocolException {
+ final ResponseCode code = ResponseCode.readCode(in);
+ final String message = code.containsMessage() ? in.readUTF() : null;
+ return new Response(code, message);
+ }
+
+ @Override
+ public String toString() {
+ return code + ": " + message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
new file mode 100644
index 0000000..8860e73
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+
+public enum ResponseCode {
+ RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of
+ // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
+ // handshaking properties
+ PROPERTIES_OK(1, "Properties OK", false),
+ UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
+ ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
+ MISSING_PROPERTY(232, "Missing Property", true),
+
+ // transaction indicators
+ CONTINUE_TRANSACTION(10, "Continue Transaction", false),
+ FINISH_TRANSACTION(11, "Finish Transaction", false),
+ CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
+ TRANSACTION_FINISHED(13, "Transaction Finished", false),
+ TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
+ CANCEL_TRANSACTION(15, "Cancel Transaction", true),
+ BAD_CHECKSUM(19, "Bad Checksum", false),
+
+ // data availability indicators
+ MORE_DATA(20, "More Data Exists", false),
+ NO_MORE_DATA(21, "No More Data Exists", false),
+
+ // port state indicators
+ UNKNOWN_PORT(200, "Unknown Port", false),
+ PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
+ PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
+
+ // authorization
+ UNAUTHORIZED(240, "User Not Authorized", true),
+
+ // error indicators
+ ABORT(250, "Abort", true),
+ UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
+ END_OF_STREAM(255, "End of Stream", false);
+
+ private static final ResponseCode[] codeArray = new ResponseCode[256];
+
+ static {
+ for ( final ResponseCode responseCode : ResponseCode.values() ) {
+ codeArray[responseCode.getCode()] = responseCode;
+ }
+ }
+
+ private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
+ private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
+ private final int code;
+ private final byte[] codeSequence;
+ private final String description;
+ private final boolean containsMessage;
+
+ private ResponseCode(final int code, final String description, final boolean containsMessage) {
+ this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
+ this.code = code;
+ this.description = description;
+ this.containsMessage = containsMessage;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public byte[] getCodeSequence() {
+ return codeSequence;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ public boolean containsMessage() {
+ return containsMessage;
+ }
+
+ public void writeResponse(final DataOutputStream out) throws IOException {
+ if ( containsMessage() ) {
+ throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
+ }
+
+ out.write(getCodeSequence());
+ out.flush();
+ }
+
+ public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
+ if ( !containsMessage() ) {
+ throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
+ }
+
+ out.write(getCodeSequence());
+ out.writeUTF(explanation);
+ out.flush();
+ }
+
+ static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
+ final int byte1 = in.read();
+ if ( byte1 < 0 ) {
+ throw new EOFException();
+ } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+ throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+ }
+
+ final int byte2 = in.read();
+ if ( byte2 < 0 ) {
+ throw new EOFException();
+ } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+ throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+ }
+
+ final int byte3 = in.read();
+ if ( byte3 < 0 ) {
+ throw new EOFException();
+ }
+
+ final ResponseCode responseCode = codeArray[byte3];
+ if (responseCode == null) {
+ throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code");
+ }
+ return responseCode;
+ }
+
+ public static ResponseCode fromSequence(final byte[] value) {
+ final int code = value[3] & 0xFF;
+ final ResponseCode responseCode = codeArray[code];
+ return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
new file mode 100644
index 0000000..5f194f8
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.ClientProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientProtocol implements ClientProtocol {
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
+
+ private RemoteDestination destination;
+ private boolean useCompression = false;
+
+ private String commsIdentifier;
+ private boolean handshakeComplete = false;
+
+ private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
+
+ private Response handshakeResponse = null;
+ private boolean readyForFileTransfer = false;
+ private String transitUriPrefix = null;
+ private int timeoutMillis = 30000;
+
+ private int batchCount;
+ private long batchSize;
+ private long batchMillis;
+
+ private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+
+ public SocketClientProtocol() {
+ }
+
+ public void setPreferredBatchCount(final int count) {
+ this.batchCount = count;
+ }
+
+ public void setPreferredBatchSize(final long bytes) {
+ this.batchSize = bytes;
+ }
+
+ public void setPreferredBatchDuration(final long millis) {
+ this.batchMillis = millis;
+ }
+
+ public void setDestination(final RemoteDestination destination) {
+ this.destination = destination;
+ this.useCompression = destination.isUseCompression();
+ }
+
+ public void setTimeout(final int timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ @Override
+ public void handshake(final Peer peer) throws IOException, HandshakeException {
+ handshake(peer, destination.getIdentifier());
+ }
+
+ public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
+ if ( handshakeComplete ) {
+ throw new IllegalStateException("Handshake has already been completed");
+ }
+ commsIdentifier = UUID.randomUUID().toString();
+ logger.debug("{} handshaking with {}", this, peer);
+
+ final Map<HandshakeProperty, String> properties = new HashMap<>();
+ properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+
+ if ( destinationId != null ) {
+ properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
+ }
+
+ properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
+
+ if ( versionNegotiator.getVersion() >= 5 ) {
+ if ( batchCount > 0 ) {
+ properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
+ }
+ if ( batchSize > 0L ) {
+ properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
+ }
+ if ( batchMillis > 0L ) {
+ properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
+ }
+ }
+
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ commsSession.setTimeout(timeoutMillis);
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ dos.writeUTF(commsIdentifier);
+
+ if ( versionNegotiator.getVersion() >= 3 ) {
+ dos.writeUTF(peer.getUrl());
+ transitUriPrefix = peer.getUrl();
+
+ if ( !transitUriPrefix.endsWith("/") ) {
+ transitUriPrefix = transitUriPrefix + "/";
+ }
+ }
+
+ dos.writeInt(properties.size());
+ for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+ dos.writeUTF(entry.getKey().name());
+ dos.writeUTF(entry.getValue());
+ }
+
+ dos.flush();
+
+ try {
+ handshakeResponse = Response.read(dis);
+ } catch (final ProtocolException e) {
+ throw new HandshakeException(e);
+ }
+
+ switch (handshakeResponse.getCode()) {
+ case PORT_NOT_IN_VALID_STATE:
+ case UNKNOWN_PORT:
+ case PORTS_DESTINATION_FULL:
+ break;
+ case PROPERTIES_OK:
+ readyForFileTransfer = true;
+ break;
+ default:
+ logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+ this, handshakeResponse, peer});
+ peer.close();
+ throw new HandshakeException("Received unexpected response " + handshakeResponse);
+ }
+
+ logger.debug("{} Finished handshake with {}", this, peer);
+ handshakeComplete = true;
+ }
+
+ public boolean isReadyForFileTransfer() {
+ return readyForFileTransfer;
+ }
+
+ public boolean isPortInvalid() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
+ }
+
+ public boolean isPortUnknown() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+ }
+
+ public boolean isDestinationFull() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
+ }
+
+ @Override
+ public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Get Peer Statuses from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+ dos.flush();
+ final int numPeers = dis.readInt();
+ final Set<PeerStatus> peers = new HashSet<>(numPeers);
+ for (int i=0; i < numPeers; i++) {
+ final String hostname = dis.readUTF();
+ final int port = dis.readInt();
+ final boolean secure = dis.readBoolean();
+ final int flowFileCount = dis.readInt();
+ peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+ }
+
+ logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
+ return peers;
+ }
+
+ @Override
+ public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Negotiating Codec with {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+
+ FlowFileCodec codec = new StandardFlowFileCodec();
+ try {
+ codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
+ } catch (HandshakeException e) {
+ throw new ProtocolException(e.toString());
+ }
+ logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+
+ return codec;
+ }
+
+
+ @Override
+ public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
+ }
+
+ return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec,
+ direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ }
+
+
+ @Override
+ public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+
+ while (true) {
+ final long start = System.nanoTime();
+ final DataPacket dataPacket = transaction.receive();
+ if ( dataPacket == null ) {
+ if ( flowFilesReceived.isEmpty() ) {
+ peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ }
+ break;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ final long receiveNanos = System.nanoTime() - start;
+
+ String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+ if ( sourceFlowFileIdentifier == null ) {
+ sourceFlowFileIdentifier = "<Unknown Identifier>";
+ }
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += dataPacket.getSize();
+ }
+
+ // Confirm that what we received was the correct data.
+ transaction.confirm();
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
+ // We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
+ final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
+
+ transaction.complete(applyBackpressure);
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+
+ if ( flowFilesReceived.isEmpty() ) {
+ return;
+ }
+
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
+ }
+
+
+ @Override
+ public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final long startSendingNanos = System.nanoTime();
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+ transaction.send(dataPacket);
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ }
+
+ transaction.confirm();
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+ transaction.complete(false);
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ } catch (final Exception e) {
+ session.rollback();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public void shutdown(final Peer peer) throws IOException {
+ readyForFileTransfer = false;
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ logger.debug("{} Shutting down with {}", this, peer);
+ // Indicate that we would like to have some data
+ RequestType.SHUTDOWN.writeRequestType(dos);
+ dos.flush();
+ }
+
+ @Override
+ public String getResourceName() {
+ return "SocketFlowFileProtocol";
+ }
+
+ @Override
+ public String toString() {
+ return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..edb360e
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientTransaction implements Transaction {
+ private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
+
+
+ private final CRC32 crc = new CRC32();
+ private final int protocolVersion;
+ private final FlowFileCodec codec;
+ private final DataInputStream dis;
+ private final DataOutputStream dos;
+ private final TransferDirection direction;
+ private final boolean compress;
+ private final Peer peer;
+ private final int penaltyMillis;
+
+ private boolean dataAvailable = false;
+ private int transfers = 0;
+ private TransactionState state;
+
+ SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec,
+ final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
+ this.protocolVersion = protocolVersion;
+ this.peer = peer;
+ this.codec = codec;
+ this.direction = direction;
+ this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+ this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+ this.compress = useCompression;
+ this.state = TransactionState.TRANSACTION_STARTED;
+ this.penaltyMillis = penaltyMillis;
+
+ initialize();
+ }
+
+ private void initialize() throws IOException {
+ try {
+ if ( direction == TransferDirection.RECEIVE ) {
+ // Indicate that we would like to have some data
+ RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ this.dataAvailable = true;
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ this.dataAvailable = false;
+ return;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
+ } else {
+ // Indicate that we would like to have some data
+ RequestType.SEND_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public DataPacket receive() throws IOException {
+ try {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.SEND ) {
+ throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+ }
+
+ // if we already know there's no data, just return null
+ if ( !dataAvailable ) {
+ return null;
+ }
+
+ // if we have already received a packet, check if another is available.
+ if ( transfers > 0 ) {
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case CONTINUE_TRANSACTION:
+ logger.debug("{} {} Indicates Transaction should continue", this, peer);
+ this.dataAvailable = true;
+ break;
+ case FINISH_TRANSACTION:
+ logger.debug("{} {} Indicates Transaction should finish", peer);
+ this.dataAvailable = false;
+ break;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+ }
+
+ // if no data available, return null
+ if ( !dataAvailable ) {
+ return null;
+ }
+
+ logger.debug("{} Receiving data from {}", this, peer);
+ final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
+
+ if ( packet == null ) {
+ this.dataAvailable = false;
+ } else {
+ transfers++;
+ }
+
+ this.state = TransactionState.DATA_EXCHANGED;
+ return packet;
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public void send(DataPacket dataPacket) throws IOException {
+ try {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+ }
+
+ if ( transfers > 0 ) {
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ }
+
+ logger.debug("{} Sending data to {}", this, peer);
+
+ final OutputStream out = new CheckedOutputStream(dos, crc);
+ codec.encode(dataPacket, out);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( compress ) {
+ out.close();
+ }
+
+ transfers++;
+ this.state = TransactionState.DATA_EXCHANGED;
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public void cancel(final String explanation) throws IOException {
+ if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
+ throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+ }
+
+ try {
+ ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+ state = TransactionState.TRANSACTION_CANCELED;
+ } catch (final IOException ioe) {
+ error();
+ throw ioe;
+ }
+ }
+
+
+ @Override
+ public void complete(boolean requestBackoff) throws IOException {
+ try {
+ if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+ throw new IllegalStateException("Cannot complete transaction because state is " + state +
+ "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( transfers == 0 ) {
+ state = TransactionState.TRANSACTION_COMPLETED;
+ return;
+ }
+
+ if ( requestBackoff ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+ }
+
+ state = TransactionState.TRANSACTION_COMPLETED;
+ } else {
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+ "It is unknown whether or not the peer successfully received/processed the data.", e);
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(penaltyMillis);
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+
+ state = TransactionState.TRANSACTION_COMPLETED;
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+
+ @Override
+ public void confirm() throws IOException {
+ try {
+ if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
+ // client requested to receive data but no data available. no need to confirm.
+ state = TransactionState.TRANSACTION_CONFIRMED;
+ return;
+ }
+
+ if ( state != TransactionState.DATA_EXCHANGED ) {
+ throw new IllegalStateException("Cannot confirm Transaction because state is " + state +
+ "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( dataAvailable ) {
+ throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ final String calculatedCRC = String.valueOf(crc.getValue());
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse;
+ try {
+ confirmTransactionResponse = Response.read(dis);
+ } catch (final IOException ioe) {
+ logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
+ throw ioe;
+ }
+
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ state = TransactionState.TRANSACTION_CONFIRMED;
+ } else {
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+
+ final String calculatedCRC = String.valueOf(crc.getValue());
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ // CRC was not used before version 4
+ if ( protocolVersion > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ state = TransactionState.TRANSACTION_CONFIRMED;
+ }
+ } catch (final Exception e) {
+ error();
+ throw e;
+ }
+ }
+
+ @Override
+ public void error() {
+ this.state = TransactionState.ERROR;
+ }
+
+ @Override
+ public TransactionState getState() {
+ return state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
new file mode 100644
index 0000000..6dab77b
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.util.Set;
+
+import org.apache.nifi.remote.PeerStatus;
+
+public class PeerStatusCache {
+ private final Set<PeerStatus> statuses;
+ private final long timestamp;
+
+ public PeerStatusCache(final Set<PeerStatus> statuses) {
+ this(statuses, System.currentTimeMillis());
+ }
+
+ public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
+ this.statuses = statuses;
+ this.timestamp = timestamp;
+ }
+
+ public Set<PeerStatus> getStatuses() {
+ return statuses;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+ public static final String CONTROLLER_URI_PATH = "/controller";
+
+ private static final int CONNECT_TIMEOUT = 10000;
+ private static final int READ_TIMEOUT = 10000;
+
+ private final Client client;
+
+ public RemoteNiFiUtils(final SSLContext sslContext) {
+ this.client = getClient(sslContext);
+ }
+
+
+ /**
+ * Gets the content at the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ return get(uri, timeoutMillis, null);
+ }
+
+ /**
+ * Gets the content at the specified URI using the given query parameters.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @param queryParams
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ if ( queryParams != null ) {
+ for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
+ webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
+ }
+ }
+
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+
+ return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ }
+
+ /**
+ * Performs a HEAD request to the specified URI.
+ *
+ * @param uri
+ * @param timeoutMillis
+ * @return
+ * @throws ClientHandlerException
+ * @throws UniformInterfaceException
+ */
+ public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+ // perform the request
+ WebResource webResource = client.resource(uri);
+ webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+ webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+ return webResource.head();
+ }
+
+ /**
+ * Gets a client based on the specified URI.
+ *
+ * @param uri
+ * @return
+ */
+ private Client getClient(final SSLContext sslContext) {
+ final Client client;
+ if (sslContext == null) {
+ client = WebUtils.createClient(null);
+ } else {
+ client = WebUtils.createClient(null, sslContext);
+ }
+
+ client.setReadTimeout(READ_TIMEOUT);
+ client.setConnectTimeout(CONNECT_TIMEOUT);
+
+ return client;
+ }
+
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+ * @param timeoutMillis
+ * @return
+ * @throws IOException
+ */
+ public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteListeningPort(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteRootGroupId(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getController(uriObject, timeoutMillis).getInstanceId();
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the full URI to fetch, including the path.
+ * @return
+ * @throws IOException
+ */
+ private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+ }
+
+ private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getId();
+ }
+
+ public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+ final ClientResponse response = get(uri, timeoutMillis);
+
+ if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ return entity.getController();
+ } else {
+ final String responseMessage = response.getEntity(String.class);
+ throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+ }
+ }
+
+ /**
+ * Issues a registration request on behalf of the current user.
+ *
+ * @param baseApiUri
+ * @return
+ */
+ public ClientResponse issueRegistrationRequest(String baseApiUri) {
+ final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+ // set up the query params
+ MultivaluedMapImpl entity = new MultivaluedMapImpl();
+ entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+
+ // create the web resource
+ WebResource webResource = client.resource(uri);
+
+ // get the client utils and make the request
+ return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+ private final Map<String, String> attributes;
+ private final InputStream stream;
+ private final long size;
+
+ public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+ this.attributes = attributes;
+ this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+ this.size = size;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public InputStream getData() {
+ return stream;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..d8899ea
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+ @Test
+ public void testFormulateDestinationListForOutput() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+ collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
+ collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
+ collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+ collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+ @Test
+ public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
+ collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+
+
+
+ @Test
+ public void testFormulateDestinationListForInputPorts() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+ collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
+ collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
+ collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+ collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+
+ @Test
+ public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> collection = new ArrayList<>();
+ collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
+ collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
+
+ clusterNodeInfo.setNodeInformation(collection);
+ final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+ for ( final PeerStatus peerStatus : destinations ) {
+ System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+ private final InputStream in;
+ private final long limit;
+ private long bytesRead = 0;
+
+ public LimitingInputStream(final InputStream in, final long limit) {
+ this.in = in;
+ this.limit = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int val = in.read();
+ if (val > -1) {
+ bytesRead++;
+ }
+ return val;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+ final int val = in.read(b, 0, maxToRead);
+ if (val > 0) {
+ bytesRead += val;
+ }
+ return val;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+ final int val = in.read(b, off, maxToRead);
+ if (val > 0) {
+ bytesRead += val;
+ }
+ return val;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ final long skipped = in.skip(Math.min(n, limit - bytesRead));
+ bytesRead += skipped;
+ return skipped;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ in.reset();
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+}