You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:56:59 UTC

[06/29] incubator-nifi git commit: NIFI-282: Bug fixes; documentation improvements; removed code marked as 'FOR TESTING'

NIFI-282: Bug fixes; documentation improvements; removed code marked as 'FOR TESTING'


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/05b64593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/05b64593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/05b64593

Branch: refs/heads/develop
Commit: 05b64593b6958aa807066e0df1571becf589dc17
Parents: ed53b46
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Feb 6 08:19:54 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Feb 6 08:19:54 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/remote/RemoteDestination.java   |  47 ++
 .../nifi-site-to-site-client/pom.xml            |  43 +
 .../remote/AbstractCommunicationsSession.java   |  54 ++
 .../main/java/org/apache/nifi/remote/Peer.java  | 119 +++
 .../java/org/apache/nifi/remote/PeerStatus.java |  72 ++
 .../nifi/remote/RemoteResourceInitiator.java    |  64 ++
 .../org/apache/nifi/remote/Transaction.java     | 200 +++++
 .../apache/nifi/remote/TransferDirection.java   |  34 +
 .../nifi/remote/VersionedRemoteResource.java    |  24 +
 .../nifi/remote/client/SiteToSiteClient.java    | 443 ++++++++++
 .../remote/client/SiteToSiteClientConfig.java   | 114 +++
 .../client/socket/EndpointConnectionState.java  |  54 ++
 .../socket/EndpointConnectionStatePool.java     | 835 +++++++++++++++++++
 .../nifi/remote/client/socket/SocketClient.java | 184 ++++
 .../remote/cluster/AdaptedNodeInformation.java  |  66 ++
 .../remote/cluster/ClusterNodeInformation.java  |  67 ++
 .../nifi/remote/cluster/NodeInformation.java    |  98 +++
 .../remote/cluster/NodeInformationAdapter.java  |  41 +
 .../apache/nifi/remote/codec/FlowFileCodec.java |  71 ++
 .../remote/codec/StandardFlowFileCodec.java     | 129 +++
 .../remote/exception/HandshakeException.java    |  30 +
 .../exception/PortNotRunningException.java      |  26 +
 .../remote/exception/ProtocolException.java     |  36 +
 .../remote/exception/UnknownPortException.java  |  26 +
 .../SocketChannelCommunicationsSession.java     |  90 ++
 .../remote/io/socket/SocketChannelInput.java    |  66 ++
 .../remote/io/socket/SocketChannelOutput.java   |  58 ++
 .../SSLSocketChannelCommunicationsSession.java  |  93 +++
 .../io/socket/ssl/SSLSocketChannelInput.java    |  50 ++
 .../io/socket/ssl/SSLSocketChannelOutput.java   |  44 +
 .../nifi/remote/protocol/ClientProtocol.java    |  86 ++
 .../remote/protocol/CommunicationsInput.java    |  27 +
 .../remote/protocol/CommunicationsOutput.java   |  27 +
 .../remote/protocol/CommunicationsSession.java  |  64 ++
 .../apache/nifi/remote/protocol/DataPacket.java |  29 +
 .../nifi/remote/protocol/RequestType.java       |  43 +
 .../protocol/socket/HandshakeProperty.java      |  61 ++
 .../nifi/remote/protocol/socket/Response.java   |  51 ++
 .../remote/protocol/socket/ResponseCode.java    | 153 ++++
 .../protocol/socket/SocketClientProtocol.java   | 437 ++++++++++
 .../socket/SocketClientTransaction.java         | 357 ++++++++
 .../nifi/remote/util/PeerStatusCache.java       |  43 +
 .../nifi/remote/util/RemoteNiFiUtils.java       | 216 +++++
 .../nifi/remote/util/StandardDataPacket.java    |  50 ++
 .../socket/TestEndpointConnectionStatePool.java |  95 +++
 .../nifi/stream/io/LimitingInputStream.java     | 111 +++
 .../stream/io/MinimumLengthInputStream.java     |  93 +++
 nifi/nifi-commons/pom.xml                       |   1 +
 .../nifi-framework/nifi-cluster/pom.xml         |   4 +
 .../nifi/cluster/manager/ClusterManager.java    |  18 +-
 .../cluster/manager/impl/WebClusterManager.java |   9 +-
 .../nifi-framework-core-api/pom.xml             |   4 +
 .../nifi/cluster/AdaptedNodeInformation.java    |  66 --
 .../nifi/cluster/ClusterNodeInformation.java    |  67 --
 .../org/apache/nifi/cluster/NodeInformant.java  |  22 -
 .../apache/nifi/cluster/NodeInformation.java    |  98 ---
 .../nifi/cluster/NodeInformationAdapter.java    |  39 -
 .../apache/nifi/groups/RemoteProcessGroup.java  |  39 +-
 .../main/java/org/apache/nifi/remote/Peer.java  | 107 ---
 .../java/org/apache/nifi/remote/PeerStatus.java |  72 --
 .../org/apache/nifi/remote/RemoteGroupPort.java |  22 +-
 .../apache/nifi/remote/TransferDirection.java   |  23 -
 .../nifi/remote/VersionedRemoteResource.java    |  24 -
 .../nifi/remote/cluster/NodeInformant.java      |  22 +
 .../apache/nifi/remote/codec/FlowFileCodec.java |  79 --
 .../remote/exception/HandshakeException.java    |  30 -
 .../exception/PortNotRunningException.java      |  26 -
 .../remote/exception/ProtocolException.java     |  34 -
 .../remote/exception/UnknownPortException.java  |  26 -
 .../nifi/remote/protocol/ClientProtocol.java    |  78 --
 .../remote/protocol/CommunicationsInput.java    |  27 -
 .../remote/protocol/CommunicationsOutput.java   |  27 -
 .../remote/protocol/CommunicationsSession.java  |  64 --
 .../nifi/remote/protocol/RequestType.java       |  43 -
 .../nifi/remote/protocol/ServerProtocol.java    |   2 +-
 .../nifi-framework/nifi-framework-core/pom.xml  |   4 +
 .../nifi/remote/StandardRemoteProcessGroup.java | 316 +------
 .../nifi-framework/nifi-site-to-site/.gitignore |   1 +
 .../util/RemoteProcessGroupUtils.class          | Bin 0 -> 9526 bytes
 .../remote/AbstractCommunicationsSession.class  | Bin 0 -> 2308 bytes
 .../nifi/remote/RemoteResourceFactory.class     | Bin 0 -> 8707 bytes
 .../nifi/remote/RemoteResourceManager.class     | Bin 0 -> 6898 bytes
 .../apache/nifi/remote/RemoteSiteListener.class | Bin 0 -> 841 bytes
 .../nifi/remote/SocketRemoteSiteListener.class  | Bin 0 -> 8448 bytes
 ...emoteGroupPort$EndpointConnectionState.class | Bin 0 -> 5427 bytes
 .../nifi/remote/StandardRemoteGroupPort.class   | Bin 0 -> 10677 bytes
 .../StandardRootGroupPort$FlowFileRequest.class | Bin 0 -> 5836 bytes
 ...StandardRootGroupPort$ProcessingResult.class | Bin 0 -> 5032 bytes
 ...upPort$StandardPortAuthorizationResult.class | Bin 0 -> 5159 bytes
 .../nifi/remote/StandardRootGroupPort.class     | Bin 0 -> 9700 bytes
 .../remote/codec/StandardFlowFileCodec.class    | Bin 0 -> 8538 bytes
 .../exception/UnsupportedCodecException.class   | Bin 0 -> 1057 bytes
 .../SocketChannelCommunicationsSession.class    | Bin 0 -> 3735 bytes
 .../remote/io/socket/SocketChannelInput.class   | Bin 0 -> 4008 bytes
 .../remote/io/socket/SocketChannelOutput.class  | Bin 0 -> 3741 bytes
 .../SSLSocketChannelCommunicationsSession.class | Bin 0 -> 4611 bytes
 .../io/socket/ssl/SSLSocketChannelInput.class   | Bin 0 -> 3127 bytes
 .../io/socket/ssl/SSLSocketChannelOutput.class  | Bin 0 -> 2587 bytes
 .../socket/ClusterManagerServerProtocol.class   | Bin 0 -> 10540 bytes
 .../protocol/socket/HandshakeProperty.class     | Bin 0 -> 917 bytes
 .../nifi/remote/protocol/socket/Response.class  | Bin 0 -> 2674 bytes
 .../remote/protocol/socket/ResponseCode.class   | Bin 0 -> 6889 bytes
 .../protocol/socket/SocketClientProtocol.class  | Bin 0 -> 8965 bytes
 .../socket/SocketFlowFileServerProtocol.class   | Bin 0 -> 8806 bytes
 .../remote/TestStandardRemoteGroupPort.class    | Bin 0 -> 5974 bytes
 .../nifi-framework/nifi-site-to-site/pom.xml    |   4 +
 .../remote/AbstractCommunicationsSession.java   |  54 --
 .../nifi/remote/RemoteResourceFactory.java      |  50 +-
 .../nifi/remote/SocketRemoteSiteListener.java   |   7 +-
 .../nifi/remote/StandardRemoteGroupPort.java    | 485 +----------
 .../remote/codec/StandardFlowFileCodec.java     | 169 ----
 .../SocketChannelCommunicationsSession.java     |  90 --
 .../remote/io/socket/SocketChannelInput.java    |  66 --
 .../remote/io/socket/SocketChannelOutput.java   |  58 --
 .../socket/ClusterManagerServerProtocol.java    |   7 +-
 .../protocol/socket/HandshakeProperty.java      |  23 -
 .../nifi/remote/protocol/socket/Response.java   |  51 --
 .../remote/protocol/socket/ResponseCode.java    | 152 ----
 .../protocol/socket/SocketClientProtocol.java   | 510 -----------
 .../socket/SocketFlowFileServerProtocol.java    | 193 +++--
 .../remote/TestStandardRemoteGroupPort.java     |  97 ---
 nifi/pom.xml                                    |   5 +
 122 files changed, 5501 insertions(+), 3135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
new file mode 100644
index 0000000..f718581
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
+public interface RemoteDestination {
+    /**
+     * Returns the identifier of the remote destination
+     * 
+     * @return
+     */
+	String getIdentifier();
+	
+	/**
+	 * Returns the amount of time that system should pause sending to a particular node if unable to 
+	 * send data to or receive data from this endpoint
+	 * @param timeUnit
+	 * @return
+	 */
+	long getYieldPeriod(TimeUnit timeUnit);
+	
+	/**
+	 * Returns whether or not compression should be used when transferring data to or receiving
+	 * data from the remote endpoint
+	 * @return
+	 */
+	boolean isUseCompression();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
new file mode 100644
index 0000000..3fc00a2
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-commons</artifactId>
+    <version>0.0.2-incubating-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>nifi-site-to-site-client</artifactId>
+  
+  <dependencies>
+  	<dependency>
+  		<groupId>org.apache.nifi</groupId>
+  		<artifactId>nifi-api</artifactId>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.nifi</groupId>
+  		<artifactId>nifi-utils</artifactId>
+  	</dependency>
+	<dependency>
+		<groupId>com.sun.jersey</groupId>
+		<artifactId>jersey-client</artifactId>
+	</dependency>
+	<dependency>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>nifi-client-dto</artifactId>
+		<version>0.0.2-incubating-SNAPSHOT</version>
+	</dependency>
+  	<dependency>
+  		<groupId>org.apache.nifi</groupId>
+  		<artifactId>nifi-web-utils</artifactId>
+  	</dependency>
+  	
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
new file mode 100644
index 0000000..4babb92
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public abstract class AbstractCommunicationsSession implements CommunicationsSession {
+    private String userDn;
+    
+    private volatile String uri;
+    
+    public AbstractCommunicationsSession(final String uri) {
+        this.uri = uri;
+    }
+    
+    @Override
+    public String toString() {
+        return uri;
+    }
+
+    @Override
+    public void setUri(final String uri) {
+        this.uri = uri;
+    }
+
+    @Override
+    public String getUri() {
+        return uri;
+    }
+
+    @Override
+    public String getUserDn() {
+        return userDn;
+    }
+    
+    @Override
+    public void setUserDn(final String dn) {
+        this.userDn = dn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..29af777
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class Peer {
+
+    private final CommunicationsSession commsSession;
+    private final String url;
+    private final String clusterUrl;
+    private final String host;
+    private long penalizationExpiration = 0L;
+    private boolean closed = false;
+
+    public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+        this.commsSession = commsSession;
+        this.url = peerUrl;
+        this.clusterUrl = clusterUrl;
+
+        try {
+            this.host = new URI(peerUrl).getHost();
+        } catch (final Exception e) {
+            throw new IllegalArgumentException("Invalid URL: " + peerUrl);
+        }
+    }
+
+    public String getUrl() {
+        return url;
+    }
+    
+    public String getClusterUrl() {
+    	return clusterUrl;
+    }
+
+    public CommunicationsSession getCommunicationsSession() {
+        return commsSession;
+    }
+
+    public void close() throws IOException {
+        this.closed = true;
+
+        // Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+        try {
+            StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
+        } finally {
+            commsSession.close();
+        }
+    }
+
+    public void penalize(final long millis) {
+        penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+    }
+
+    public boolean isPenalized() {
+        return penalizationExpiration > System.currentTimeMillis();
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    @Override
+    public int hashCode() {
+        return 8320 + url.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof Peer)) {
+            return false;
+        }
+
+        final Peer other = (Peer) obj;
+        return this.url.equals(other.url);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Peer[url=").append(url);
+        if (closed) {
+            sb.append(",CLOSED");
+        } else if (isPenalized()) {
+            sb.append(",PENALIZED");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..d1cb076
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+    private final String hostname;
+    private final int port;
+    private final boolean secure;
+    private final int numFlowFiles;
+
+    public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
+        this.hostname = hostname;
+        this.port = port;
+        this.secure = secure;
+        this.numFlowFiles = numFlowFiles;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public boolean isSecure() {
+        return secure;
+    }
+
+    public int getFlowFileCount() {
+        return numFlowFiles;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return 9824372 + hostname.hashCode() + port;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof PeerStatus)) {
+            return false;
+        }
+
+        final PeerStatus other = (PeerStatus) obj;
+        return port == other.port && hostname.equals(other.hostname);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
new file mode 100644
index 0000000..8eb5d8d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.HandshakeException;
+
+public class RemoteResourceInitiator {
+	public static final int RESOURCE_OK = 20;
+	public static final int DIFFERENT_RESOURCE_VERSION = 21;
+	public static final int ABORT = 255;
+
+	
+	public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+        // Write the classname of the RemoteStreamCodec, followed by its version
+    	dos.writeUTF(resource.getResourceName());
+    	final VersionNegotiator negotiator = resource.getVersionNegotiator();
+    	dos.writeInt(negotiator.getVersion());
+    	dos.flush();
+        
+        // wait for response from server.
+        final int statusCode = dis.read();
+        switch (statusCode) {
+            case RESOURCE_OK:	// server accepted our proposal of codec name/version
+                return resource;
+            case DIFFERENT_RESOURCE_VERSION:	// server accepted our proposal of codec name but not the version
+                // Get server's preferred version
+            	final int newVersion = dis.readInt();
+                
+                // Determine our new preferred version that is no greater than the server's preferred version.
+                final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+                // If we could not agree with server on a version, fail now.
+                if ( newPreference == null ) {
+                    throw new HandshakeException("Could not agree on version for " + resource);
+                }
+                
+                negotiator.setVersion(newPreference);
+                
+                // Attempt negotiation of resource based on our new preferred version.
+                return initiateResourceNegotiation(resource, dis, dos);
+            case ABORT:
+            	throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+            default:
+                return null;	// Unable to negotiate codec
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..cc16625
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+
+/**
+ * <p>
+ * Provides a transaction for performing site-to-site data transfers.
+ * </p>
+ * 
+ * <p>
+ * A Transaction is created by calling the 
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} 
+ * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
+ * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
+ * other operation.
+ * </p>
+ * 
+ * <p>
+ * The general flow of execute of a Transaction is as follows:
+ * <ol>
+ *      <li>Create the transaction as described above.</li>
+ *      <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method. This method
+ *          will be called 1 or more times. In the case of receive, this method should be called until the method returns {@code null},
+ *          signifying that the remote instance is finished sending data. <b>Note:</b> <code>receive()</code> should not be
+ *          called a second time without first fully consuming the stream from the previous Packet that was received.</li>
+ *      <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ *      <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction
+ *          via the {@link #cancel()} method.</li>
+ * </ol>
+ * </p>
+ * 
+ * <p>
+ * It is important that the Transaction be terminated in order to free the resources held
+ * by the Transaction. If a Transaction is not terminated, its resources will not be freed and
+ * if the Transaction holds connections from a connection pool, the connections in that pool
+ * will eventually become exhausted. A Transaction is terminated by calling one of the following
+ * methods:
+ *  <ul>
+ *      <li>{@link #complete(boolean)}</li>
+ *      <li>{@link #cancel()}</li>
+ *      <li>{@link #error()}</li>
+ *  </ul>
+ * </p>
+ * 
+ * <p>
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction
+ * is automatically closed via a call to {@link #error()}.
+ * </p>
+ * 
+ * <p>
+ * The Transaction class should not be assumed to be thread-safe.
+ * </p>
+ */
+public interface Transaction {
+
+    /**
+     * Sends information to the remote NiFi instance.
+     * 
+     * @param dataPacket the data packet to send
+     * @throws IOException
+     */
+    void send(DataPacket dataPacket) throws IOException;
+    
+    /**
+     * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
+     * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to 
+     * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction
+     * has finished. This is done in order to prevent the need for a round-trip network request to receive data for
+     * each data packet.
+     * 
+     * @return the DataPacket received, or {@code null} if there is no more data to receive. 
+     * @throws IOException
+     */
+    DataPacket receive() throws IOException;
+
+    /**
+     * <p>
+     * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received.
+     * </p>
+     * 
+     * <p>
+     * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP),
+     * it is still required that we confirm the transaction before completing the transaction. This is done as
+     * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if
+     * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the
+     * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the
+     * sender and receiver will cancel the transaction automatically.
+     * </p>
+     * 
+     * <p>
+     * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless
+     * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}).
+     * </p>
+     * 
+     * <p>
+     * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be
+     * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}.
+     * </p>
+     * 
+     * @throws IOException
+     */
+	void confirm() throws IOException;
+	
+	/**
+	 * <p>
+	 * Completes the transaction and indicates to both the sender and receiver that the data transfer was
+	 * successful. If receiving data, this method can also optionally request that the sender back off sending
+	 * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
+	 * that the receiver is not ready to receive data and made not service another request in the short term.
+	 * </p>
+	 * 
+	 * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
+	 * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
+	 * this Transaction is SEND, then this argument is ignored.
+	 * 
+	 * @throws IOException
+	 */
+	void complete(boolean requestBackoff) throws IOException;
+	
+	/**
+	 * <p>
+	 * Cancels this transaction, indicating to the sender that the data has not been successfully received so that
+	 * the sender can retry or handle however is appropriate.
+	 * </p>
+	 * 
+	 * @param explanation an explanation to tell the other party why the transaction was canceled.
+	 * @throws IOException
+	 */
+	void cancel(final String explanation) throws IOException;
+	
+	
+	/**
+	 * <p>
+	 * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes
+	 * the Transaction. The underlying connection should not be returned to a connection pool in this case.
+	 * </p>
+	 */
+	void error();
+	
+	
+	/**
+	 * Returns the current state of the Transaction.
+	 * @return
+	 * @throws IOException
+	 */
+	TransactionState getState() throws IOException;
+	
+	
+	public enum TransactionState {
+	    /**
+	     * Transaction has been started but no data has been sent or received.
+	     */
+		TRANSACTION_STARTED,
+		
+		/**
+		 * Transaction has been started and data has been sent or received.
+		 */
+		DATA_EXCHANGED,
+		
+		/**
+		 * Data that has been transferred has been confirmed via its CRC. Transaction is
+		 * ready to be completed.
+		 */
+		TRANSACTION_CONFIRMED,
+		
+		/**
+		 * Transaction has been successfully completed.
+		 */
+		TRANSACTION_COMPLETED,
+		
+		/**
+		 * The Transaction has been canceled.
+		 */
+		TRANSACTION_CANCELED,
+		
+		/**
+		 * The Transaction ended in an error.
+		 */
+		ERROR;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..45029a4
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+
+/**
+ * An enumeration for specifying the direction in which data should be transferred between a client
+ * and a remote NiFi instance.
+ */
+public enum TransferDirection {
+	/**
+	 * The client is to send data to the remote instance.
+	 */
+    SEND,
+    
+    /**
+     * The client is to receive data from the remote instance.
+     */
+    RECEIVE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+    VersionNegotiator getVersionNegotiator();
+
+    String getResourceName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
new file mode 100644
index 0000000..fa94b81
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * <p>
+ * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi
+ * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster).
+ * </p>
+ * 
+ * <p>
+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must
+ * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance
+ * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL 
+ * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load
+ * balancing the transactions across the different nodes in the cluster.
+ * </p>
+ * 
+ * <p>
+ * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the
+ * remote instance takes place. After data has been exchanged or it is determined that no data
+ * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)}
+ * method) or can be completed (via the {@link Transaction#complete(boolean)} method).
+ * </p>
+ * 
+ * <p>
+ * An instance of SiteToSiteClient can be obtained by constructing a new instance of the 
+ * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the
+ * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method.
+ * </p>
+ *
+ * <p>
+ * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can
+ * share access to the same client. However, the {@link Transaction} that is created by the client
+ * is not thread safe and should not be shared among threads.
+ * </p>
+ */
+public interface SiteToSiteClient extends Closeable {
+
+	/**
+	 * Creates a new Transaction that can be used to either send data to a remote NiFi instance
+	 * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument.
+	 * 
+	 * 
+	 * @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND}
+	 * indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates
+	 * that this Transaction will be used to receive data from the remote instance.
+	 * 
+	 * @return
+	 * @throws IOException
+	 */
+	Transaction createTransaction(TransferDirection direction) throws IOException;
+	
+	/**
+	 * <p>
+	 * Returns {@code true} if site-to-site communications with the remote instance are secure, 
+	 * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not
+	 * communications are secure depends on the server, not the client.
+	 * </p>
+	 * 
+	 * <p>
+	 * In order to determine whether the server is configured for secure communications, the client may have
+	 * to query the server's RESTful interface. Doing so could result in an IOException.
+	 * </p>
+	 * 
+	 * @return
+	 * @throws IOException if unable to query the remote instance's RESTful interface or if the remote
+	 * instance is not configured to allow site-to-site communications
+	 */
+	boolean isSecure() throws IOException;
+	
+	/**
+	 * <p>
+	 * Returns the configuration object that was built by the Builder
+	 * </p>
+	 * @return
+	 */
+	SiteToSiteClientConfig getConfig();
+	
+	/**
+	 * <p>
+	 * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient.
+	 * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If
+	 * a change in configuration should be desired, the client should be {@link Closeable#close() closed}
+	 * and a new client created. 
+	 * </p>
+	 */
+	public static class Builder {
+		private String url;
+		private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+		private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+		private SSLContext sslContext;
+		private EventReporter eventReporter;
+		private File peerPersistenceFile;
+		private boolean useCompression;
+		private String portName;
+		private String portIdentifier;
+		private int batchCount;
+		private long batchSize;
+		private long batchNanos;
+
+		/**
+		 * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of
+		 * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across
+		 * the different nodes.
+		 * 
+		 * @param url
+		 * @return
+		 */
+		public Builder url(final String url) {
+			this.url = url;
+			return this;
+		}
+		
+		/**
+		 * Specifies the communications timeouts to use when interacting with the remote instances. The
+		 * default value is 30 seconds.
+		 * 
+		 * @param timeout
+		 * @param unit
+		 * @return
+		 */
+		public Builder timeout(final long timeout, final TimeUnit unit) {
+			this.timeoutNanos = unit.toNanos(timeout);
+			return this;
+		}
+		
+		/**
+		 * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
+		 * or the remote instance of NiFi if it is standalone), specifies how long the client should
+		 * wait before attempting to communicate with that node again. While a particular node is penalized,
+		 * all other nodes in the remote cluster (if any) will still be available for communication.
+		 * The default value is 3 seconds.
+		 * 
+		 * @param period
+		 * @param unit
+		 * @return
+		 */
+		public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
+			this.penalizationNanos = unit.toNanos(period);
+			return this;
+		}
+		
+		/**
+		 * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not
+		 * specified, communications will not be secure. The remote instance of NiFi always determines
+		 * whether or not Site-to-Site communications are secure (i.e., the client will always use
+		 * secure or non-secure communications, depending on what the server dictates).
+		 * 
+		 * @param sslContext
+		 * @return
+		 */
+		public Builder sslContext(final SSLContext sslContext) {
+			this.sslContext = sslContext;
+			return this;
+		}
+		
+		
+		/**
+		 * Provides an EventReporter that can be used by the client in order to report any events that
+		 * could be of interest when communicating with the remote instance. The EventReporter provided
+		 * must be threadsafe.
+		 * 
+		 * @param eventReporter
+		 * @return
+		 */
+		public Builder eventReporter(final EventReporter eventReporter) {
+			this.eventReporter = eventReporter;
+			return this;
+		}
+		
+		
+		/**
+		 * Specifies a file that the client can write to in order to persist the list of nodes in the
+		 * remote cluster and recover the list of nodes upon restart. This allows the client to function
+		 * if the remote Cluster Manager is unavailable, even after a restart of the client software.
+		 * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager
+		 * will result in not being able to communicate with the remote instance if a new client
+		 * is created. 
+		 * 
+		 * @param peerPersistenceFile
+		 * @return
+		 */
+		public Builder peerPersistenceFile(final File peerPersistenceFile) {
+			this.peerPersistenceFile = peerPersistenceFile;
+			return this;
+		}
+		
+		/**
+		 * Specifies whether or not data should be compressed before being transferred to or from the
+		 * remote instance.
+		 * 
+		 * @param compress
+		 * @return
+		 */
+		public Builder useCompression(final boolean compress) {
+			this.useCompression = compress;
+			return this;
+		}
+		
+		/**
+		 * Specifies the name of the port to communicate with. Either the port name or the port identifier
+		 * must be specified.
+		 * 
+		 * @param portName
+		 * @return
+		 */
+		public Builder portName(final String portName) {
+			this.portName = portName;
+			return this;
+		}
+		
+		/**
+		 * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing
+		 * the port name, as the port name may change.
+		 * 
+		 * @param portIdentifier
+		 * @return
+		 */
+		public Builder portIdentifier(final String portIdentifier) {
+			this.portIdentifier = portIdentifier;
+			return this;
+		}
+		
+		/**
+	     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	     * the client has the ability to request a particular batch size/duration. This method specifies
+	     * the preferred number of {@link DataPacket}s to include in a Transaction.
+	     * 
+	     * @return
+	     */
+		public Builder requestBatchCount(final int count) {
+		    this.batchCount = count;
+		    return this;
+		}
+
+		/**
+	     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	     * the client has the ability to request a particular batch size/duration. This method specifies
+	     * the preferred number of bytes to include in a Transaction.
+	     * 
+	     * @return
+	     */
+		public Builder requestBatchSize(final long bytes) {
+		    this.batchSize = bytes;
+		    return this;
+		}
+		
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+         * the client has the ability to request a particular batch size/duration. This method specifies
+         * the preferred amount of time that a Transaction should span.
+         * 
+         * @return
+         */
+		public Builder requestBatchDuration(final long value, final TimeUnit unit) {
+		    this.batchNanos = unit.toNanos(value);
+		    return this;
+		}
+		
+		
+		/**
+		 * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
+		 * @return
+		 */
+		public SiteToSiteClient build() {
+			if ( url == null ) {
+				throw new IllegalStateException("Must specify URL to build Site-to-Site client");
+			}
+			
+			if ( portName == null && portIdentifier == null ) {
+				throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+			}
+			
+			final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+				
+				@Override
+				public boolean isUseCompression() {
+					return Builder.this.isUseCompression();
+				}
+				
+				@Override
+				public String getUrl() {
+					return Builder.this.getUrl();
+				}
+				
+				@Override
+				public long getTimeout(final TimeUnit timeUnit) {
+					return Builder.this.getTimeout(timeUnit);
+				}
+				
+				@Override
+				public SSLContext getSslContext() {
+					return Builder.this.getSslContext();
+				}
+				
+				@Override
+				public String getPortName() {
+					return Builder.this.getPortName();
+				}
+				
+				@Override
+				public String getPortIdentifier() {
+					return Builder.this.getPortIdentifier();
+				}
+				
+				@Override
+				public long getPenalizationPeriod(final TimeUnit timeUnit) {
+					return Builder.this.getPenalizationPeriod(timeUnit);
+				}
+				
+				@Override
+				public File getPeerPersistenceFile() {
+					return Builder.this.getPeerPersistenceFile();
+				}
+				
+				@Override
+				public EventReporter getEventReporter() {
+					return Builder.this.getEventReporter();
+				}
+
+		        @Override
+		        public long getPreferredBatchDuration(final TimeUnit timeUnit) {
+		            return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+		        }
+		        
+		        @Override
+		        public long getPreferredBatchSize() {
+		            return Builder.this.batchSize;
+		        }
+		        
+		        @Override
+		        public int getPreferredBatchCount() {
+		            return Builder.this.batchCount;
+		        }
+			};
+			
+			return new SocketClient(config);
+		}
+
+		/**
+		 * Returns the configured URL for the remote NiFi instance
+		 * @return
+		 */
+		public String getUrl() {
+			return url;
+		}
+
+		/**
+		 * Returns the communications timeout in nanoseconds
+		 * @return
+		 */
+		public long getTimeout(final TimeUnit timeUnit) {
+			return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+		}
+
+		/**
+		 * Returns the amount of time that a particular node will be ignored after a
+		 * communications error with that node occurs
+		 * @param timeUnit
+		 * @return
+		 */
+		public long getPenalizationPeriod(TimeUnit timeUnit) {
+			return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+		}
+
+		/**
+		 * Returns the SSL Context that is configured for this builder
+		 * @return
+		 */
+		public SSLContext getSslContext() {
+			return sslContext;
+		}
+
+		/**
+		 * Returns the EventReporter that is to be used by clients to report events
+		 * @return
+		 */
+		public EventReporter getEventReporter() {
+			return eventReporter;
+		}
+
+		/**
+		 * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+		 * @return
+		 */
+		public File getPeerPersistenceFile() {
+			return peerPersistenceFile;
+		}
+
+		/**
+		 * Returns a boolean indicating whether or not compression will be used to transfer data
+		 * to and from the remote instance
+		 * @return
+		 */
+		public boolean isUseCompression() {
+			return useCompression;
+		}
+
+		/**
+		 * Returns the name of the port that the client is to communicate with.
+		 * @return
+		 */
+		public String getPortName() {
+			return portName;
+		}
+
+		/**
+		 * Returns the identifier of the port that the client is to communicate with.
+		 * @return
+		 */
+		public String getPortIdentifier() {
+			return portIdentifier;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
new file mode 100644
index 0000000..37c48f8
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClientConfig {
+
+	/**
+	 * Returns the configured URL for the remote NiFi instance
+	 * @return
+	 */
+	String getUrl();
+
+	/**
+	 * Returns the communications timeout in nanoseconds
+	 * @return
+	 */
+	long getTimeout(final TimeUnit timeUnit);
+
+	/**
+	 * Returns the amount of time that a particular node will be ignored after a
+	 * communications error with that node occurs
+	 * @param timeUnit
+	 * @return
+	 */
+	long getPenalizationPeriod(TimeUnit timeUnit);
+
+	/**
+	 * Returns the SSL Context that is configured for this builder
+	 * @return
+	 */
+	SSLContext getSslContext();
+	
+	/**
+	 * Returns the EventReporter that is to be used by clients to report events
+	 * @return
+	 */
+	EventReporter getEventReporter();
+
+	/**
+	 * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+	 * @return
+	 */
+	File getPeerPersistenceFile();
+
+	/**
+	 * Returns a boolean indicating whether or not compression will be used to transfer data
+	 * to and from the remote instance
+	 * @return
+	 */
+	boolean isUseCompression();
+
+	/**
+	 * Returns the name of the port that the client is to communicate with.
+	 * @return
+	 */
+	String getPortName();
+
+	/**
+	 * Returns the identifier of the port that the client is to communicate with.
+	 * @return
+	 */
+	String getPortIdentifier();
+	
+	/**
+	 * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	 * the client has the ability to request a particular batch size/duration. This returns the maximum
+	 * amount of time that we will request a NiFi instance to send data to us in a Transaction.
+	 * 
+	 * @param timeUnit
+	 * @return
+	 */
+	long getPreferredBatchDuration(TimeUnit timeUnit);
+	
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. This returns the maximum
+     * number of bytes that we will request a NiFi instance to send data to us in a Transaction.
+     * 
+     * @return
+     */
+	long getPreferredBatchSize();
+	
+	
+	/**
+     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. This returns the maximum
+     * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction.
+     * 
+     * @return
+     */
+	int getPreferredBatchCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
new file mode 100644
index 0000000..f4ac727
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+
+public class EndpointConnectionState {
+	private final Peer peer;
+    private final SocketClientProtocol socketClientProtocol;
+    private final FlowFileCodec codec;
+    private volatile long lastUsed;
+    
+    public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
+        this.peer = peer;
+        this.socketClientProtocol = socketClientProtocol;
+        this.codec = codec;
+    }
+    
+    public FlowFileCodec getCodec() {
+        return codec;
+    }
+    
+    public SocketClientProtocol getSocketClientProtocol() {
+        return socketClientProtocol;
+    }
+    
+    public Peer getPeer() {
+        return peer;
+    }
+    
+    public void setLastTimeUsed() {
+        lastUsed = System.currentTimeMillis();
+    }
+    
+    public long getLastTimeUsed() {
+        return lastUsed;
+    }
+}