You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/06 14:20:22 UTC

[4/6] incubator-nifi git commit: NIFI-282: Bug fixes; documentation improvements; removed code marked as 'FOR TESTING'

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
new file mode 100644
index 0000000..dc3d68f
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+public class SSLSocketChannelOutput implements CommunicationsOutput {
+    private final OutputStream out;
+    private final ByteCountingOutputStream countingOut;
+    
+    public SSLSocketChannelOutput(final SSLSocketChannel channel) {
+        countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
+        out = new BufferedOutputStream(countingOut);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return out;
+    }
+    
+    @Override
+    public long getBytesWritten() {
+        return countingOut.getBytesWritten();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..befbdaa
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+    void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
+
+    Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
+
+    FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+    void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void shutdown(Peer peer) throws IOException, ProtocolException;
+
+    boolean isReadyForFileTransfer();
+
+    
+    
+    
+    Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException;
+    
+    
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * invalid
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortInvalid() throws IllegalStateException;
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * unknown
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortUnknown();
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port's
+     * destination is full
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isDestinationFull();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+    InputStream getInputStream() throws IOException;
+
+    long getBytesRead();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+    OutputStream getOutputStream() throws IOException;
+
+    long getBytesWritten();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+    public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+
+    CommunicationsInput getInput();
+
+    CommunicationsOutput getOutput();
+
+    void setTimeout(int millis) throws IOException;
+
+    int getTimeout() throws IOException;
+
+    void setUri(String uri);
+
+    String getUri();
+
+    String getUserDn();
+
+    void setUserDn(String dn);
+
+    boolean isDataAvailable();
+
+    long getBytesWritten();
+
+    long getBytesRead();
+
+    /**
+     * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
+     * that they stop sending and receiving data as soon as possible after this
+     * method has been called, even if doing so results in sending only partial
+     * data to the peer. This will usually result in the peer throwing a
+     * SocketTimeoutException.
+     */
+    void interrupt();
+
+    /**
+     * Returns <code>true</code> if the connection is closed, <code>false</code>
+     * otherwise.
+     *
+     * @return
+     */
+    boolean isClosed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+	Map<String, String> getAttributes();
+	
+	InputStream getData();
+	
+	long getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+    NEGOTIATE_FLOWFILE_CODEC,
+    REQUEST_PEER_LIST,
+    SEND_FLOWFILES,
+    RECEIVE_FLOWFILES,
+    SHUTDOWN;
+
+    public void writeRequestType(final DataOutputStream dos) throws IOException {
+        dos.writeUTF(name());
+    }
+
+    public static RequestType readRequestType(final DataInputStream dis) throws IOException {
+        final String requestTypeVal = dis.readUTF();
+        try {
+            return RequestType.valueOf(requestTypeVal);
+        } catch (final Exception e) {
+            throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
new file mode 100644
index 0000000..41dc276
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ */
+public enum HandshakeProperty {
+    /**
+     * Boolean value indicating whether or not the contents of a FlowFile should be
+     * GZipped when transferred.
+     */
+    GZIP,
+    
+    /**
+     * The unique identifier of the port to communicate with
+     */
+    PORT_IDENTIFIER,
+    
+    /**
+     * Indicates the number of milliseconds after the request was made that the client
+     * will wait for a response. If no response has been received by the time this value
+     * expires, the server can move on without attempting to service the request because
+     * the client will have already disconnected.
+     */
+    REQUEST_EXPIRATION_MILLIS,
+    
+    /**
+     * The preferred number of FlowFiles that the server should send to the client
+     * when pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_COUNT,
+    
+    /**
+     * The preferred number of bytes that the server should send to the client when
+     * pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_SIZE,
+    
+    /**
+     * The preferred amount of time that the server should send data to the client
+     * when pulling data. This property was introduced in version 5 of the protocol.
+     * Value is in milliseconds.
+     */
+    BATCH_DURATION;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
new file mode 100644
index 0000000..eae1940
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class Response {
+    private final ResponseCode code;
+    private final String message;
+    
+    private Response(final ResponseCode code, final String explanation) {
+        this.code = code;
+        this.message = explanation;
+    }
+    
+    public ResponseCode getCode() {
+        return code;
+    }
+    
+    public String getMessage() {
+        return message;
+    }
+    
+    public static Response read(final DataInputStream in) throws IOException, ProtocolException {
+        final ResponseCode code = ResponseCode.readCode(in);
+        final String message = code.containsMessage() ? in.readUTF() : null;
+        return new Response(code, message);
+    }
+    
+    @Override
+    public String toString() {
+        return code + ": " + message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
new file mode 100644
index 0000000..8860e73
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+
+public enum ResponseCode {
+    RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of
+                                            // ResponseCode, so that we can indicate a 0 followed by some other bytes
+    
+    // handshaking properties
+    PROPERTIES_OK(1, "Properties OK", false),
+    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
+    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
+    MISSING_PROPERTY(232, "Missing Property", true),
+    
+    // transaction indicators
+    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
+    FINISH_TRANSACTION(11, "Finish Transaction", false),
+    CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" of this code is the checksum
+    TRANSACTION_FINISHED(13, "Transaction Finished", false),
+    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
+    CANCEL_TRANSACTION(15, "Cancel Transaction", true),
+    BAD_CHECKSUM(19, "Bad Checksum", false),
+
+    // data availability indicators
+    MORE_DATA(20, "More Data Exists", false),
+    NO_MORE_DATA(21, "No More Data Exists", false),
+    
+    // port state indicators
+    UNKNOWN_PORT(200, "Unknown Port", false),
+    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
+    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
+    
+    // authorization
+    UNAUTHORIZED(240, "User Not Authorized", true),
+    
+    // error indicators
+    ABORT(250, "Abort", true),
+    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
+    END_OF_STREAM(255, "End of Stream", false);
+    
+    private static final ResponseCode[] codeArray = new ResponseCode[256];
+    
+    static {
+        for ( final ResponseCode responseCode : ResponseCode.values() ) {
+            codeArray[responseCode.getCode()] = responseCode;
+        }
+    }
+    
+    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
+    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
+    private final int code;
+    private final byte[] codeSequence;
+    private final String description;
+    private final boolean containsMessage;
+    
+    private ResponseCode(final int code, final String description, final boolean containsMessage) {
+        this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
+        this.code = code;
+        this.description = description;
+        this.containsMessage = containsMessage;
+    }
+    
+    public int getCode() {
+        return code;
+    }
+    
+    public byte[] getCodeSequence() {
+        return codeSequence;
+    }
+    
+    @Override
+    public String toString() {
+        return description;
+    }
+    
+    public boolean containsMessage() {
+        return containsMessage;
+    }
+    
+    public void writeResponse(final DataOutputStream out) throws IOException {
+        if ( containsMessage() ) {
+            throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
+        }
+        
+        out.write(getCodeSequence());
+        out.flush();
+    }
+    
+    public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
+        if ( !containsMessage() ) {
+            throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
+        }
+        
+        out.write(getCodeSequence());
+        out.writeUTF(explanation);
+        out.flush();
+    }
+    
+    static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
+        final int byte1 = in.read();
+        if ( byte1 < 0 ) {
+            throw new EOFException();
+        } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+            throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+        }
+        
+        final int byte2 = in.read();
+        if ( byte2 < 0 ) {
+            throw new EOFException();
+        } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+            throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+        }
+
+        final int byte3 = in.read();
+        if ( byte3 < 0 ) {
+            throw new EOFException();
+        }
+        
+        final ResponseCode responseCode = codeArray[byte3];
+        if (responseCode == null) {
+            throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code");
+        }
+        return responseCode;
+    }
+    
+    public static ResponseCode fromSequence(final byte[] value) {
+        final int code = value[3] & 0xFF;
+        final ResponseCode responseCode = codeArray[code];
+        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
new file mode 100644
index 0000000..5f194f8
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.ClientProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientProtocol implements ClientProtocol {
+    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
+
+    private RemoteDestination destination;
+    private boolean useCompression = false;
+    
+    private String commsIdentifier;
+    private boolean handshakeComplete = false;
+    
+    private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
+    
+    private Response handshakeResponse = null;
+    private boolean readyForFileTransfer = false;
+    private String transitUriPrefix = null;
+    private int timeoutMillis = 30000;
+    
+    private int batchCount;
+    private long batchSize;
+    private long batchMillis;
+
+    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+    
+    public SocketClientProtocol() {
+    }
+
+    public void setPreferredBatchCount(final int count) {
+        this.batchCount = count;
+    }
+    
+    public void setPreferredBatchSize(final long bytes) {
+        this.batchSize = bytes;
+    }
+    
+    public void setPreferredBatchDuration(final long millis) {
+        this.batchMillis = millis;
+    }
+    
+    public void setDestination(final RemoteDestination destination) {
+        this.destination = destination;
+        this.useCompression = destination.isUseCompression();
+    }
+    
+    public void setTimeout(final int timeoutMillis) {
+    	this.timeoutMillis = timeoutMillis;
+    }
+    
+    @Override
+    public void handshake(final Peer peer) throws IOException, HandshakeException {
+    	handshake(peer, destination.getIdentifier());
+    }
+    
+    public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
+        if ( handshakeComplete ) {
+            throw new IllegalStateException("Handshake has already been completed");
+        }
+        commsIdentifier = UUID.randomUUID().toString();
+        logger.debug("{} handshaking with {}", this, peer);
+        
+        final Map<HandshakeProperty, String> properties = new HashMap<>();
+        properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+        
+        if ( destinationId != null ) {
+        	properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
+        }
+        
+        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
+        
+        if ( versionNegotiator.getVersion() >= 5 ) {
+            if ( batchCount > 0 ) {
+                properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
+            }
+            if ( batchSize > 0L ) {
+                properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
+            }
+            if ( batchMillis > 0L ) {
+                properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
+            }
+        }
+        
+        final CommunicationsSession commsSession = peer.getCommunicationsSession();
+        commsSession.setTimeout(timeoutMillis);
+        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        dos.writeUTF(commsIdentifier);
+        
+        if ( versionNegotiator.getVersion() >= 3 ) {
+            dos.writeUTF(peer.getUrl());
+            transitUriPrefix = peer.getUrl();
+            
+            if ( !transitUriPrefix.endsWith("/") ) {
+                transitUriPrefix = transitUriPrefix + "/";
+            }
+        }
+        
+        dos.writeInt(properties.size());
+        for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+            dos.writeUTF(entry.getKey().name());
+            dos.writeUTF(entry.getValue());
+        }
+        
+        dos.flush();
+        
+        try {
+            handshakeResponse = Response.read(dis);
+        } catch (final ProtocolException e) {
+            throw new HandshakeException(e);
+        }
+        
+        switch (handshakeResponse.getCode()) {
+            case PORT_NOT_IN_VALID_STATE:
+            case UNKNOWN_PORT:
+            case PORTS_DESTINATION_FULL:
+                break;
+            case PROPERTIES_OK:
+                readyForFileTransfer = true;
+                break;
+            default:
+                logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+                    this, handshakeResponse, peer});
+                peer.close();
+                throw new HandshakeException("Received unexpected response " + handshakeResponse);
+        }
+        
+        logger.debug("{} Finished handshake with {}", this, peer);
+        handshakeComplete = true;
+    }
+    
+    public boolean isReadyForFileTransfer() {
+        return readyForFileTransfer;
+    }
+    
+    public boolean isPortInvalid() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed successfully");
+        }
+        return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
+    }
+    
+    public boolean isPortUnknown() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed successfully");
+        }
+        return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+    }
+    
+    public boolean isDestinationFull() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed successfully");
+        }
+        return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
+    }
+    
+    @Override
+    public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been performed");
+        }
+        
+        logger.debug("{} Get Peer Statuses from {}", this, peer);
+        final CommunicationsSession commsSession = peer.getCommunicationsSession();
+        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+        dos.flush();
+        final int numPeers = dis.readInt();
+        final Set<PeerStatus> peers = new HashSet<>(numPeers);
+        for (int i=0; i < numPeers; i++) {
+            final String hostname = dis.readUTF();
+            final int port = dis.readInt();
+            final boolean secure = dis.readBoolean();
+            final int flowFileCount = dis.readInt();
+            peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+        }
+        
+        logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
+        return peers;
+    }
+    
+    @Override
+    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been performed");
+        }
+
+        logger.debug("{} Negotiating Codec with {}", this, peer);
+        final CommunicationsSession commsSession = peer.getCommunicationsSession();
+        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+        RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+        
+        FlowFileCodec codec = new StandardFlowFileCodec();
+        try {
+            codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
+        } catch (HandshakeException e) {
+            throw new ProtocolException(e.toString());
+        }
+        logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+
+        return codec;
+    }
+
+
+    @Override
+    public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been performed");
+        }
+        if ( !readyForFileTransfer ) {
+            throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
+        }
+        
+        return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec, 
+        		direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+    }
+
+
+    @Override
+    public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+    	final String userDn = peer.getCommunicationsSession().getUserDn();
+    	final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+    	
+    	final StopWatch stopWatch = new StopWatch(true);
+    	final Set<FlowFile> flowFilesReceived = new HashSet<>();
+    	long bytesReceived = 0L;
+    	
+    	while (true) {
+    		final long start = System.nanoTime();
+    		final DataPacket dataPacket = transaction.receive();
+    		if ( dataPacket == null ) {
+    		    if ( flowFilesReceived.isEmpty() ) {
+    		        peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+    		    }
+    			break;
+    		}
+    		
+    		FlowFile flowFile = session.create();
+    		flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+    		flowFile = session.importFrom(dataPacket.getData(), flowFile);
+    		final long receiveNanos = System.nanoTime() - start;
+    		
+			String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+			if ( sourceFlowFileIdentifier == null ) {
+				sourceFlowFileIdentifier = "<Unknown Identifier>";
+			}
+			
+			final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+			session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+    		session.transfer(flowFile, Relationship.ANONYMOUS);
+    		bytesReceived += dataPacket.getSize();
+    	}
+
+    	// Confirm that what we received was the correct data.
+    	transaction.confirm();
+    	
+		// Commit the session so that we have persisted the data
+		session.commit();
+
+		// We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
+		final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
+
+		transaction.complete(applyBackpressure);
+		logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+
+		if ( flowFilesReceived.isEmpty() ) {
+		    return;
+		}
+		
+		stopWatch.stop();
+		final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+		final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+		final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+		final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+		logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { 
+				this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
+    }
+
+    
+    @Override
+    public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+		FlowFile flowFile = session.get();
+		if (flowFile == null) {
+			return;
+		}
+
+		try {
+			final String userDn = peer.getCommunicationsSession().getUserDn();
+			final long startSendingNanos = System.nanoTime();
+			final StopWatch stopWatch = new StopWatch(true);
+			long bytesSent = 0L;
+			
+			final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+			
+			final Set<FlowFile> flowFilesSent = new HashSet<>();
+	        boolean continueTransaction = true;
+	        while (continueTransaction) {
+	        	final long startNanos = System.nanoTime();
+	            // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+	            final FlowFile toWrap = flowFile;
+	            session.read(flowFile, new InputStreamCallback() {
+	                @Override
+	                public void process(final InputStream in) throws IOException {
+	                    final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+	                    transaction.send(dataPacket);
+	                }
+	            });
+	            
+	            final long transferNanos = System.nanoTime() - startNanos;
+	            final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+	            
+	            flowFilesSent.add(flowFile);
+	            bytesSent += flowFile.getSize();
+	            logger.debug("{} Sent {} to {}", this, flowFile, peer);
+	            
+	            final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+	            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+	            session.remove(flowFile);
+	            
+	            final long sendingNanos = System.nanoTime() - startSendingNanos;
+	            if ( sendingNanos < BATCH_SEND_NANOS ) { 
+	                flowFile = session.get();
+	            } else {
+	                flowFile = null;
+	            }
+	            
+	            continueTransaction = (flowFile != null);
+	        }
+	        
+	        transaction.confirm();
+	        
+	        // consume input stream entirely, ignoring its contents. If we
+	        // don't do this, the Connection will not be returned to the pool
+	        stopWatch.stop();
+	        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+	        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+	        final String dataSize = FormatUtils.formatDataSize(bytesSent);
+	        
+	        session.commit();
+	        transaction.complete(false);
+	        
+	        final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+	        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+	            this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+		} catch (final Exception e) {
+			session.rollback();
+			throw e;
+		}
+    }
+    
+    
+    @Override
+    public VersionNegotiator getVersionNegotiator() {
+        return versionNegotiator;
+    }
+    
+    @Override
+    public void shutdown(final Peer peer) throws IOException {
+        readyForFileTransfer = false;
+        final CommunicationsSession commsSession = peer.getCommunicationsSession();
+        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        logger.debug("{} Shutting down with {}", this, peer);
+        // Indicate that we would like to have some data
+        RequestType.SHUTDOWN.writeRequestType(dos);
+        dos.flush();
+    }
+
+    @Override
+    public String getResourceName() {
+        return "SocketFlowFileProtocol";
+    }
+    
+    @Override
+    public String toString() {
+        return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..edb360e
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientTransaction implements Transaction {
+	private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
+	
+	
+	private final CRC32 crc = new CRC32();
+	private final int protocolVersion;
+	private final FlowFileCodec codec;
+	private final DataInputStream dis;
+	private final DataOutputStream dos;
+	private final TransferDirection direction;
+	private final boolean compress;
+	private final Peer peer;
+	private final int penaltyMillis;
+	
+	private boolean dataAvailable = false;
+	private int transfers = 0;
+	private TransactionState state;
+	
+	SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec, 
+			final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
+		this.protocolVersion = protocolVersion;
+		this.peer = peer;
+		this.codec = codec;
+		this.direction = direction;
+		this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+		this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+		this.compress = useCompression;
+		this.state = TransactionState.TRANSACTION_STARTED;
+		this.penaltyMillis = penaltyMillis;
+		
+		initialize();
+	}
+	
+	private void initialize() throws IOException {
+	    try {
+            if ( direction == TransferDirection.RECEIVE ) {
+                // Indicate that we would like to have some data
+                RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+                dos.flush();
+                
+                final Response dataAvailableCode = Response.read(dis);
+                switch (dataAvailableCode.getCode()) {
+                    case MORE_DATA:
+                        logger.debug("{} {} Indicates that data is available", this, peer);
+                        this.dataAvailable = true;
+                        break;
+                    case NO_MORE_DATA:
+                        logger.debug("{} No data available from {}", peer);
+                        this.dataAvailable = false;
+                        return;
+                    default:
+                        throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+                }
+    
+            } else {
+                // Indicate that we would like to have some data
+                RequestType.SEND_FLOWFILES.writeRequestType(dos);
+                dos.flush();
+            }
+	    } catch (final Exception e) {
+	        error();
+	        throw e;
+	    }
+	}
+	
+	
+	@Override
+	public DataPacket receive() throws IOException {
+	    try {
+    		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+    			throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+    		}
+    		
+        	if ( direction == TransferDirection.SEND ) {
+        	    throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+        	}
+        	
+        	// if we already know there's no data, just return null
+        	if ( !dataAvailable ) {
+        	    return null;
+        	}
+    
+        	// if we have already received a packet, check if another is available.
+        	if ( transfers > 0 ) {
+        	    // Determine if Peer will send us data or has no data to send us
+                final Response dataAvailableCode = Response.read(dis);
+                switch (dataAvailableCode.getCode()) {
+                    case CONTINUE_TRANSACTION:
+                        logger.debug("{} {} Indicates Transaction should continue", this, peer);
+                        this.dataAvailable = true;
+                        break;
+                    case FINISH_TRANSACTION:
+                        logger.debug("{} {} Indicates Transaction should finish", peer);
+                        this.dataAvailable = false;
+                        break;
+                    default:
+                        throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+                }
+            }
+        	
+        	// if no data available, return null
+        	if ( !dataAvailable ) {
+        	    return null;
+        	}
+        	
+            logger.debug("{} Receiving data from {}", this, peer);
+            final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
+            
+            if ( packet == null ) {
+                this.dataAvailable = false;
+            } else {
+            	transfers++;
+            }
+            
+            this.state = TransactionState.DATA_EXCHANGED;
+            return packet;
+	    } catch (final Exception e) {
+	        error();
+	        throw e;
+	    }
+	}
+	
+	
+	@Override
+	public void send(DataPacket dataPacket) throws IOException {
+	    try {
+    		if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+    			throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+    		}
+    
+            if ( direction == TransferDirection.RECEIVE ) {
+                throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+            }
+    
+    		if ( transfers > 0 ) {
+                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+            }
+    
+            logger.debug("{} Sending data to {}", this, peer);
+    
+    		final OutputStream out = new CheckedOutputStream(dos, crc);
+            codec.encode(dataPacket, out);
+            
+            // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+            // Otherwise, do NOT close it because we don't want to close the underlying stream
+            // (CompressionOutputStream will not close the underlying stream when it's closed)
+            if ( compress ) {
+            	out.close();
+            }
+            
+            transfers++;
+            this.state = TransactionState.DATA_EXCHANGED;
+	    } catch (final Exception e) {
+	        error();
+	        throw e;
+	    }
+	}
+	
+	
+	@Override
+	public void cancel(final String explanation) throws IOException {
+		if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
+			throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+		}
+
+		try {
+		    ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+		    state = TransactionState.TRANSACTION_CANCELED;
+		} catch (final IOException ioe) {
+		    error();
+		    throw ioe;
+		}
+	}
+	
+	
+	@Override
+	public void complete(boolean requestBackoff) throws IOException {
+	    try {
+    		if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+    			throw new IllegalStateException("Cannot complete transaction because state is " + state + 
+    					"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+    		}
+    		
+    		if ( direction == TransferDirection.RECEIVE ) {
+    		    if ( transfers == 0 ) {
+    		        state = TransactionState.TRANSACTION_COMPLETED;
+    		        return;
+    		    }
+    		    
+                if ( requestBackoff ) {
+                    // Confirm that we received the data and the peer can now discard it but that the peer should not
+                    // send any more data for a bit
+                    logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+                    ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+                } else {
+                    // Confirm that we received the data and the peer can now discard it
+                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+                    ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+                }
+                
+                state = TransactionState.TRANSACTION_COMPLETED;
+            } else {
+                final Response transactionResponse;
+                try {
+                    transactionResponse = Response.read(dis);
+                } catch (final IOException e) {
+                    throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+                            "It is unknown whether or not the peer successfully received/processed the data.", e);
+                }
+                
+                logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+                if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                    peer.penalize(penaltyMillis);
+                } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+                    throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+                }
+                
+                state = TransactionState.TRANSACTION_COMPLETED;
+            }
+	    } catch (final Exception e) {
+	        error();
+	        throw e;
+	    }
+	}
+	
+	
+	@Override
+	public void confirm() throws IOException {
+	    try {
+	        if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
+	            // client requested to receive data but no data available. no need to confirm.
+	            state = TransactionState.TRANSACTION_CONFIRMED;
+	            return;
+	        }
+	        
+    		if ( state != TransactionState.DATA_EXCHANGED ) {
+    			throw new IllegalStateException("Cannot confirm Transaction because state is " + state + 
+    					"; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+    		}
+    
+            if ( direction == TransferDirection.RECEIVE ) {
+                if ( dataAvailable ) {
+                    throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+                }
+                
+                // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+                // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+                // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+                // session and then when we send the response back to the peer, the peer may have timed out and may not
+                // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+                // Critical Section involved in this transaction so that rather than the Critical Section being the
+                // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+                logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+                final String calculatedCRC = String.valueOf(crc.getValue());
+                ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+                
+                final Response confirmTransactionResponse;
+                try {
+                    confirmTransactionResponse = Response.read(dis);
+                } catch (final IOException ioe) {
+                    logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
+                    throw ioe;
+                }
+                
+                logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+                
+                switch (confirmTransactionResponse.getCode()) {
+                    case CONFIRM_TRANSACTION:
+                        break;
+                    case BAD_CHECKSUM:
+                        throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+                    default:
+                        throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+                }
+                
+                state = TransactionState.TRANSACTION_CONFIRMED;
+            } else {
+                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+                
+                final String calculatedCRC = String.valueOf(crc.getValue());
+                
+                // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+                final Response transactionConfirmationResponse = Response.read(dis);
+                if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+                    // Confirm checksum and echo back the confirmation.
+                    logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+                    final String receivedCRC = transactionConfirmationResponse.getMessage();
+                    
+                    // CRC was not used before version 4
+                    if ( protocolVersion > 3 ) {
+                        if ( !receivedCRC.equals(calculatedCRC) ) {
+                            ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                            throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                        }
+                    }
+                    
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+                } else {
+                    throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+                }
+                
+                state = TransactionState.TRANSACTION_CONFIRMED;
+            }
+	    } catch (final Exception e) {
+	        error();
+	        throw e;
+	    }
+	}
+
+	@Override
+	public void error() {
+	    this.state = TransactionState.ERROR;
+	}
+	
+	@Override
+	public TransactionState getState() {
+		return state;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
new file mode 100644
index 0000000..6dab77b
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.util.Set;
+
+import org.apache.nifi.remote.PeerStatus;
+
+public class PeerStatusCache {
+	private final Set<PeerStatus> statuses;
+    private final long timestamp;
+
+    public PeerStatusCache(final Set<PeerStatus> statuses) {
+        this(statuses, System.currentTimeMillis());
+    }
+
+    public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
+        this.statuses = statuses;
+        this.timestamp = timestamp;
+    }
+
+    public Set<PeerStatus> getStatuses() {
+        return statuses;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+    public static final String CONTROLLER_URI_PATH = "/controller";
+
+    private static final int CONNECT_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    
+    private final Client client;
+    
+    public RemoteNiFiUtils(final SSLContext sslContext) {
+        this.client = getClient(sslContext);
+    }
+    
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+        return get(uri, timeoutMillis, null);
+    }
+    
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @param queryParams 
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if ( queryParams != null ) {
+            for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
+                webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
+            }
+        }
+
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+
+        return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+        return webResource.head();
+    }
+
+    /**
+     * Gets a client based on the specified URI.
+     * 
+     * @param uri
+     * @return 
+     */
+    private Client getClient(final SSLContext sslContext) {
+        final Client client;
+        if (sslContext == null) {
+            client = WebUtils.createClient(null);
+        } else {
+            client = WebUtils.createClient(null, sslContext);
+        }
+
+        client.setReadTimeout(READ_TIMEOUT);
+        client.setConnectTimeout(CONNECT_TIMEOUT);
+
+        return client;
+    }
+    
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+     * @param timeoutMillis
+     * @return
+     * @throws IOException
+     */
+    public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+    	try {
+			final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+			return getRemoteListeningPort(uriObject, timeoutMillis);
+		} catch (URISyntaxException e) {
+			throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+		}
+    }
+    
+    public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getRemoteRootGroupId(uriObject, timeoutMillis);
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+        }
+    }
+    
+    public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getController(uriObject, timeoutMillis).getInstanceId();
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+        }
+    }
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the full URI to fetch, including the path.
+     * @return
+     * @throws IOException
+     */
+    private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+    	return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+    }
+    
+    private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+        return getController(uri, timeoutMillis).getId();
+    }
+    
+    public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+        final ClientResponse response = get(uri, timeoutMillis);
+        
+        if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+            final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+            return entity.getController();
+        } else {
+            final String responseMessage = response.getEntity(String.class);
+            throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+        }
+    }
+    
+    /**
+     * Issues a registration request on behalf of the current user.
+     * 
+     * @param baseApiUri 
+     * @return  
+     */
+    public ClientResponse issueRegistrationRequest(String baseApiUri) {
+        final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+        // set up the query params
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+        
+        // create the web resource
+        WebResource webResource = client.resource(uri);
+        
+        // get the client utils and make the request
+        return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+	private final Map<String, String> attributes;
+	private final InputStream stream;
+	private final long size;
+	
+	public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+		this.attributes = attributes;
+		this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+		this.size = size;
+	}
+	
+	public Map<String, String> getAttributes() {
+		return attributes;
+	}
+	
+	public InputStream getData() {
+		return stream;
+	}
+	
+	public long getSize() {
+		return size;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..d8899ea
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+    @Test
+    public void testFormulateDestinationListForOutput() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240));
+        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    
+    
+    
+    @Test
+    public void testFormulateDestinationListForInputPorts() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240));
+        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+
+    public long getLimit() {
+    	return limit;
+    }
+}