You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:56:59 UTC
[06/29] incubator-nifi git commit: NIFI-282: Bug fixes;
documentation improvements; removed code marked as 'FOR TESTING'
NIFI-282: Bug fixes; documentation improvements; removed code marked as 'FOR TESTING'
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/05b64593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/05b64593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/05b64593
Branch: refs/heads/develop
Commit: 05b64593b6958aa807066e0df1571becf589dc17
Parents: ed53b46
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Feb 6 08:19:54 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Feb 6 08:19:54 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/remote/RemoteDestination.java | 47 ++
.../nifi-site-to-site-client/pom.xml | 43 +
.../remote/AbstractCommunicationsSession.java | 54 ++
.../main/java/org/apache/nifi/remote/Peer.java | 119 +++
.../java/org/apache/nifi/remote/PeerStatus.java | 72 ++
.../nifi/remote/RemoteResourceInitiator.java | 64 ++
.../org/apache/nifi/remote/Transaction.java | 200 +++++
.../apache/nifi/remote/TransferDirection.java | 34 +
.../nifi/remote/VersionedRemoteResource.java | 24 +
.../nifi/remote/client/SiteToSiteClient.java | 443 ++++++++++
.../remote/client/SiteToSiteClientConfig.java | 114 +++
.../client/socket/EndpointConnectionState.java | 54 ++
.../socket/EndpointConnectionStatePool.java | 835 +++++++++++++++++++
.../nifi/remote/client/socket/SocketClient.java | 184 ++++
.../remote/cluster/AdaptedNodeInformation.java | 66 ++
.../remote/cluster/ClusterNodeInformation.java | 67 ++
.../nifi/remote/cluster/NodeInformation.java | 98 +++
.../remote/cluster/NodeInformationAdapter.java | 41 +
.../apache/nifi/remote/codec/FlowFileCodec.java | 71 ++
.../remote/codec/StandardFlowFileCodec.java | 129 +++
.../remote/exception/HandshakeException.java | 30 +
.../exception/PortNotRunningException.java | 26 +
.../remote/exception/ProtocolException.java | 36 +
.../remote/exception/UnknownPortException.java | 26 +
.../SocketChannelCommunicationsSession.java | 90 ++
.../remote/io/socket/SocketChannelInput.java | 66 ++
.../remote/io/socket/SocketChannelOutput.java | 58 ++
.../SSLSocketChannelCommunicationsSession.java | 93 +++
.../io/socket/ssl/SSLSocketChannelInput.java | 50 ++
.../io/socket/ssl/SSLSocketChannelOutput.java | 44 +
.../nifi/remote/protocol/ClientProtocol.java | 86 ++
.../remote/protocol/CommunicationsInput.java | 27 +
.../remote/protocol/CommunicationsOutput.java | 27 +
.../remote/protocol/CommunicationsSession.java | 64 ++
.../apache/nifi/remote/protocol/DataPacket.java | 29 +
.../nifi/remote/protocol/RequestType.java | 43 +
.../protocol/socket/HandshakeProperty.java | 61 ++
.../nifi/remote/protocol/socket/Response.java | 51 ++
.../remote/protocol/socket/ResponseCode.java | 153 ++++
.../protocol/socket/SocketClientProtocol.java | 437 ++++++++++
.../socket/SocketClientTransaction.java | 357 ++++++++
.../nifi/remote/util/PeerStatusCache.java | 43 +
.../nifi/remote/util/RemoteNiFiUtils.java | 216 +++++
.../nifi/remote/util/StandardDataPacket.java | 50 ++
.../socket/TestEndpointConnectionStatePool.java | 95 +++
.../nifi/stream/io/LimitingInputStream.java | 111 +++
.../stream/io/MinimumLengthInputStream.java | 93 +++
nifi/nifi-commons/pom.xml | 1 +
.../nifi-framework/nifi-cluster/pom.xml | 4 +
.../nifi/cluster/manager/ClusterManager.java | 18 +-
.../cluster/manager/impl/WebClusterManager.java | 9 +-
.../nifi-framework-core-api/pom.xml | 4 +
.../nifi/cluster/AdaptedNodeInformation.java | 66 --
.../nifi/cluster/ClusterNodeInformation.java | 67 --
.../org/apache/nifi/cluster/NodeInformant.java | 22 -
.../apache/nifi/cluster/NodeInformation.java | 98 ---
.../nifi/cluster/NodeInformationAdapter.java | 39 -
.../apache/nifi/groups/RemoteProcessGroup.java | 39 +-
.../main/java/org/apache/nifi/remote/Peer.java | 107 ---
.../java/org/apache/nifi/remote/PeerStatus.java | 72 --
.../org/apache/nifi/remote/RemoteGroupPort.java | 22 +-
.../apache/nifi/remote/TransferDirection.java | 23 -
.../nifi/remote/VersionedRemoteResource.java | 24 -
.../nifi/remote/cluster/NodeInformant.java | 22 +
.../apache/nifi/remote/codec/FlowFileCodec.java | 79 --
.../remote/exception/HandshakeException.java | 30 -
.../exception/PortNotRunningException.java | 26 -
.../remote/exception/ProtocolException.java | 34 -
.../remote/exception/UnknownPortException.java | 26 -
.../nifi/remote/protocol/ClientProtocol.java | 78 --
.../remote/protocol/CommunicationsInput.java | 27 -
.../remote/protocol/CommunicationsOutput.java | 27 -
.../remote/protocol/CommunicationsSession.java | 64 --
.../nifi/remote/protocol/RequestType.java | 43 -
.../nifi/remote/protocol/ServerProtocol.java | 2 +-
.../nifi-framework/nifi-framework-core/pom.xml | 4 +
.../nifi/remote/StandardRemoteProcessGroup.java | 316 +------
.../nifi-framework/nifi-site-to-site/.gitignore | 1 +
.../util/RemoteProcessGroupUtils.class | Bin 0 -> 9526 bytes
.../remote/AbstractCommunicationsSession.class | Bin 0 -> 2308 bytes
.../nifi/remote/RemoteResourceFactory.class | Bin 0 -> 8707 bytes
.../nifi/remote/RemoteResourceManager.class | Bin 0 -> 6898 bytes
.../apache/nifi/remote/RemoteSiteListener.class | Bin 0 -> 841 bytes
.../nifi/remote/SocketRemoteSiteListener.class | Bin 0 -> 8448 bytes
...emoteGroupPort$EndpointConnectionState.class | Bin 0 -> 5427 bytes
.../nifi/remote/StandardRemoteGroupPort.class | Bin 0 -> 10677 bytes
.../StandardRootGroupPort$FlowFileRequest.class | Bin 0 -> 5836 bytes
...StandardRootGroupPort$ProcessingResult.class | Bin 0 -> 5032 bytes
...upPort$StandardPortAuthorizationResult.class | Bin 0 -> 5159 bytes
.../nifi/remote/StandardRootGroupPort.class | Bin 0 -> 9700 bytes
.../remote/codec/StandardFlowFileCodec.class | Bin 0 -> 8538 bytes
.../exception/UnsupportedCodecException.class | Bin 0 -> 1057 bytes
.../SocketChannelCommunicationsSession.class | Bin 0 -> 3735 bytes
.../remote/io/socket/SocketChannelInput.class | Bin 0 -> 4008 bytes
.../remote/io/socket/SocketChannelOutput.class | Bin 0 -> 3741 bytes
.../SSLSocketChannelCommunicationsSession.class | Bin 0 -> 4611 bytes
.../io/socket/ssl/SSLSocketChannelInput.class | Bin 0 -> 3127 bytes
.../io/socket/ssl/SSLSocketChannelOutput.class | Bin 0 -> 2587 bytes
.../socket/ClusterManagerServerProtocol.class | Bin 0 -> 10540 bytes
.../protocol/socket/HandshakeProperty.class | Bin 0 -> 917 bytes
.../nifi/remote/protocol/socket/Response.class | Bin 0 -> 2674 bytes
.../remote/protocol/socket/ResponseCode.class | Bin 0 -> 6889 bytes
.../protocol/socket/SocketClientProtocol.class | Bin 0 -> 8965 bytes
.../socket/SocketFlowFileServerProtocol.class | Bin 0 -> 8806 bytes
.../remote/TestStandardRemoteGroupPort.class | Bin 0 -> 5974 bytes
.../nifi-framework/nifi-site-to-site/pom.xml | 4 +
.../remote/AbstractCommunicationsSession.java | 54 --
.../nifi/remote/RemoteResourceFactory.java | 50 +-
.../nifi/remote/SocketRemoteSiteListener.java | 7 +-
.../nifi/remote/StandardRemoteGroupPort.java | 485 +----------
.../remote/codec/StandardFlowFileCodec.java | 169 ----
.../SocketChannelCommunicationsSession.java | 90 --
.../remote/io/socket/SocketChannelInput.java | 66 --
.../remote/io/socket/SocketChannelOutput.java | 58 --
.../socket/ClusterManagerServerProtocol.java | 7 +-
.../protocol/socket/HandshakeProperty.java | 23 -
.../nifi/remote/protocol/socket/Response.java | 51 --
.../remote/protocol/socket/ResponseCode.java | 152 ----
.../protocol/socket/SocketClientProtocol.java | 510 -----------
.../socket/SocketFlowFileServerProtocol.java | 193 +++--
.../remote/TestStandardRemoteGroupPort.java | 97 ---
nifi/pom.xml | 5 +
122 files changed, 5501 insertions(+), 3135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
new file mode 100644
index 0000000..f718581
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
+public interface RemoteDestination {
+ /**
+ * Returns the identifier of the remote destination
+ *
+ * @return
+ */
+ String getIdentifier();
+
+ /**
+ * Returns the amount of time that system should pause sending to a particular node if unable to
+ * send data to or receive data from this endpoint
+ * @param timeUnit
+ * @return
+ */
+ long getYieldPeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns whether or not compression should be used when transferring data to or receiving
+ * data from the remote endpoint
+ * @return
+ */
+ boolean isUseCompression();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
new file mode 100644
index 0000000..3fc00a2
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-commons</artifactId>
+ <version>0.0.2-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-site-to-site-client</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-client-dto</artifactId>
+ <version>0.0.2-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
new file mode 100644
index 0000000..4babb92
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public abstract class AbstractCommunicationsSession implements CommunicationsSession {
+ private String userDn;
+
+ private volatile String uri;
+
+ public AbstractCommunicationsSession(final String uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public String toString() {
+ return uri;
+ }
+
+ @Override
+ public void setUri(final String uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public String getUri() {
+ return uri;
+ }
+
+ @Override
+ public String getUserDn() {
+ return userDn;
+ }
+
+ @Override
+ public void setUserDn(final String dn) {
+ this.userDn = dn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..29af777
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class Peer {
+
+ private final CommunicationsSession commsSession;
+ private final String url;
+ private final String clusterUrl;
+ private final String host;
+ private long penalizationExpiration = 0L;
+ private boolean closed = false;
+
+ public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+ this.commsSession = commsSession;
+ this.url = peerUrl;
+ this.clusterUrl = clusterUrl;
+
+ try {
+ this.host = new URI(peerUrl).getHost();
+ } catch (final Exception e) {
+ throw new IllegalArgumentException("Invalid URL: " + peerUrl);
+ }
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getClusterUrl() {
+ return clusterUrl;
+ }
+
+ public CommunicationsSession getCommunicationsSession() {
+ return commsSession;
+ }
+
+ public void close() throws IOException {
+ this.closed = true;
+
+ // Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+ try {
+ StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
+ } finally {
+ commsSession.close();
+ }
+ }
+
+ public void penalize(final long millis) {
+ penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+ }
+
+ public boolean isPenalized() {
+ return penalizationExpiration > System.currentTimeMillis();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int hashCode() {
+ return 8320 + url.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Peer)) {
+ return false;
+ }
+
+ final Peer other = (Peer) obj;
+ return this.url.equals(other.url);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Peer[url=").append(url);
+ if (closed) {
+ sb.append(",CLOSED");
+ } else if (isPenalized()) {
+ sb.append(",PENALIZED");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..d1cb076
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+ private final String hostname;
+ private final int port;
+ private final boolean secure;
+ private final int numFlowFiles;
+
+ public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
+ this.hostname = hostname;
+ this.port = port;
+ this.secure = secure;
+ this.numFlowFiles = numFlowFiles;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ public int getFlowFileCount() {
+ return numFlowFiles;
+ }
+
+ @Override
+ public String toString() {
+ return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 9824372 + hostname.hashCode() + port;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof PeerStatus)) {
+ return false;
+ }
+
+ final PeerStatus other = (PeerStatus) obj;
+ return port == other.port && hostname.equals(other.hostname);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
new file mode 100644
index 0000000..8eb5d8d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.HandshakeException;
+
+public class RemoteResourceInitiator {
+ public static final int RESOURCE_OK = 20;
+ public static final int DIFFERENT_RESOURCE_VERSION = 21;
+ public static final int ABORT = 255;
+
+
+ public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ // Write the classname of the RemoteStreamCodec, followed by its version
+ dos.writeUTF(resource.getResourceName());
+ final VersionNegotiator negotiator = resource.getVersionNegotiator();
+ dos.writeInt(negotiator.getVersion());
+ dos.flush();
+
+ // wait for response from server.
+ final int statusCode = dis.read();
+ switch (statusCode) {
+ case RESOURCE_OK: // server accepted our proposal of codec name/version
+ return resource;
+ case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ // Get server's preferred version
+ final int newVersion = dis.readInt();
+
+ // Determine our new preferred version that is no greater than the server's preferred version.
+ final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+ // If we could not agree with server on a version, fail now.
+ if ( newPreference == null ) {
+ throw new HandshakeException("Could not agree on version for " + resource);
+ }
+
+ negotiator.setVersion(newPreference);
+
+ // Attempt negotiation of resource based on our new preferred version.
+ return initiateResourceNegotiation(resource, dis, dos);
+ case ABORT:
+ throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+ default:
+ return null; // Unable to negotiate codec
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..cc16625
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+
+/**
+ * <p>
+ * Provides a transaction for performing site-to-site data transfers.
+ * </p>
+ *
+ * <p>
+ * A Transaction is created by calling the
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)}
+ * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
+ * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
+ * other operation.
+ * </p>
+ *
+ * <p>
+ * The general flow of execute of a Transaction is as follows:
+ * <ol>
+ * <li>Create the transaction as described above.</li>
+ * <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method. This method
+ * will be called 1 or more times. In the case of receive, this method should be called until the method returns {@code null},
+ * signifying that the remote instance is finished sending data. <b>Note:</b> <code>receive()</code> should not be
+ * called a second time without first fully consuming the stream from the previous Packet that was received.</li>
+ * <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ * <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction
+ * via the {@link #cancel()} method.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * It is important that the Transaction be terminated in order to free the resources held
+ * by the Transaction. If a Transaction is not terminated, its resources will not be freed and
+ * if the Transaction holds connections from a connection pool, the connections in that pool
+ * will eventually become exhausted. A Transaction is terminated by calling one of the following
+ * methods:
+ * <ul>
+ * <li>{@link #complete(boolean)}</li>
+ * <li>{@link #cancel()}</li>
+ * <li>{@link #error()}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction
+ * is automatically closed via a call to {@link #error()}.
+ * </p>
+ *
+ * <p>
+ * The Transaction class should not be assumed to be thread-safe.
+ * </p>
+ */
+public interface Transaction {
+
+ /**
+ * Sends information to the remote NiFi instance.
+ *
+ * @param dataPacket the data packet to send
+ * @throws IOException
+ */
+ void send(DataPacket dataPacket) throws IOException;
+
+ /**
+ * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
+ * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to
+ * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction
+ * has finished. This is done in order to prevent the need for a round-trip network request to receive data for
+ * each data packet.
+ *
+ * @return the DataPacket received, or {@code null} if there is no more data to receive.
+ * @throws IOException
+ */
+ DataPacket receive() throws IOException;
+
+ /**
+ * <p>
+ * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received.
+ * </p>
+ *
+ * <p>
+ * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP),
+ * it is still required that we confirm the transaction before completing the transaction. This is done as
+ * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if
+ * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the
+ * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the
+ * sender and receiver will cancel the transaction automatically.
+ * </p>
+ *
+ * <p>
+ * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless
+ * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}).
+ * </p>
+ *
+ * <p>
+ * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be
+ * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}.
+ * </p>
+ *
+ * @throws IOException
+ */
+ void confirm() throws IOException;
+
+ /**
+ * <p>
+ * Completes the transaction and indicates to both the sender and receiver that the data transfer was
+ * successful. If receiving data, this method can also optionally request that the sender back off sending
+ * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
+ * that the receiver is not ready to receive data and made not service another request in the short term.
+ * </p>
+ *
+ * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
+ * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
+ * this Transaction is SEND, then this argument is ignored.
+ *
+ * @throws IOException
+ */
+ void complete(boolean requestBackoff) throws IOException;
+
+ /**
+ * <p>
+ * Cancels this transaction, indicating to the sender that the data has not been successfully received so that
+ * the sender can retry or handle however is appropriate.
+ * </p>
+ *
+ * @param explanation an explanation to tell the other party why the transaction was canceled.
+ * @throws IOException
+ */
+ void cancel(final String explanation) throws IOException;
+
+
+ /**
+ * <p>
+ * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes
+ * the Transaction. The underlying connection should not be returned to a connection pool in this case.
+ * </p>
+ */
+ void error();
+
+
+ /**
+ * Returns the current state of the Transaction.
+ * @return
+ * @throws IOException
+ */
+ TransactionState getState() throws IOException;
+
+
+ public enum TransactionState {
+ /**
+ * Transaction has been started but no data has been sent or received.
+ */
+ TRANSACTION_STARTED,
+
+ /**
+ * Transaction has been started and data has been sent or received.
+ */
+ DATA_EXCHANGED,
+
+ /**
+ * Data that has been transferred has been confirmed via its CRC. Transaction is
+ * ready to be completed.
+ */
+ TRANSACTION_CONFIRMED,
+
+ /**
+ * Transaction has been successfully completed.
+ */
+ TRANSACTION_COMPLETED,
+
+ /**
+ * The Transaction has been canceled.
+ */
+ TRANSACTION_CANCELED,
+
+ /**
+ * The Transaction ended in an error.
+ */
+ ERROR;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..45029a4
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+
+/**
+ * An enumeration for specifying the direction in which data should be transferred between a client
+ * and a remote NiFi instance.
+ */
+public enum TransferDirection {
+ /**
+ * The client is to send data to the remote instance.
+ */
+ SEND,
+
+ /**
+ * The client is to receive data from the remote instance.
+ */
+ RECEIVE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+ VersionNegotiator getVersionNegotiator();
+
+ String getResourceName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
new file mode 100644
index 0000000..fa94b81
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * <p>
+ * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi
+ * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster).
+ * </p>
+ *
+ * <p>
+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must
+ * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance
+ * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL
+ * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load
+ * balancing the transactions across the different nodes in the cluster.
+ * </p>
+ *
+ * <p>
+ * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the
+ * remote instance takes place. After data has been exchanged or it is determined that no data
+ * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)}
+ * method) or can be completed (via the {@link Transaction#complete(boolean)} method).
+ * </p>
+ *
+ * <p>
+ * An instance of SiteToSiteClient can be obtained by constructing a new instance of the
+ * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the
+ * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method.
+ * </p>
+ *
+ * <p>
+ * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can
+ * share access to the same client. However, the {@link Transaction} that is created by the client
+ * is not thread safe and should not be shared among threads.
+ * </p>
+ */
+public interface SiteToSiteClient extends Closeable {
+
+ /**
+ * Creates a new Transaction that can be used to either send data to a remote NiFi instance
+ * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument.
+ *
+ *
+ * @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND}
+ * indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates
+ * that this Transaction will be used to receive data from the remote instance.
+ *
+ * @return
+ * @throws IOException
+ */
+ Transaction createTransaction(TransferDirection direction) throws IOException;
+
+ /**
+ * <p>
+ * Returns {@code true} if site-to-site communications with the remote instance are secure,
+ * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not
+ * communications are secure depends on the server, not the client.
+ * </p>
+ *
+ * <p>
+ * In order to determine whether the server is configured for secure communications, the client may have
+ * to query the server's RESTful interface. Doing so could result in an IOException.
+ * </p>
+ *
+ * @return
+ * @throws IOException if unable to query the remote instance's RESTful interface or if the remote
+ * instance is not configured to allow site-to-site communications
+ */
+ boolean isSecure() throws IOException;
+
+ /**
+ * <p>
+ * Returns the configuration object that was built by the Builder
+ * </p>
+ * @return
+ */
+ SiteToSiteClientConfig getConfig();
+
+ /**
+ * <p>
+ * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient.
+ * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If
+ * a change in configuration should be desired, the client should be {@link Closeable#close() closed}
+ * and a new client created.
+ * </p>
+ */
+ public static class Builder {
+ private String url;
+ private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+ private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+ private SSLContext sslContext;
+ private EventReporter eventReporter;
+ private File peerPersistenceFile;
+ private boolean useCompression;
+ private String portName;
+ private String portIdentifier;
+ private int batchCount;
+ private long batchSize;
+ private long batchNanos;
+
+ /**
+ * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of
+ * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across
+ * the different nodes.
+ *
+ * @param url
+ * @return
+ */
+ public Builder url(final String url) {
+ this.url = url;
+ return this;
+ }
+
+ /**
+ * Specifies the communications timeouts to use when interacting with the remote instances. The
+ * default value is 30 seconds.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ */
+ public Builder timeout(final long timeout, final TimeUnit unit) {
+ this.timeoutNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /**
+ * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
+ * or the remote instance of NiFi if it is standalone), specifies how long the client should
+ * wait before attempting to communicate with that node again. While a particular node is penalized,
+ * all other nodes in the remote cluster (if any) will still be available for communication.
+ * The default value is 3 seconds.
+ *
+ * @param period
+ * @param unit
+ * @return
+ */
+ public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
+ this.penalizationNanos = unit.toNanos(period);
+ return this;
+ }
+
+ /**
+ * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not
+ * specified, communications will not be secure. The remote instance of NiFi always determines
+ * whether or not Site-to-Site communications are secure (i.e., the client will always use
+ * secure or non-secure communications, depending on what the server dictates).
+ *
+ * @param sslContext
+ * @return
+ */
+ public Builder sslContext(final SSLContext sslContext) {
+ this.sslContext = sslContext;
+ return this;
+ }
+
+
+ /**
+ * Provides an EventReporter that can be used by the client in order to report any events that
+ * could be of interest when communicating with the remote instance. The EventReporter provided
+ * must be threadsafe.
+ *
+ * @param eventReporter
+ * @return
+ */
+ public Builder eventReporter(final EventReporter eventReporter) {
+ this.eventReporter = eventReporter;
+ return this;
+ }
+
+
+ /**
+ * Specifies a file that the client can write to in order to persist the list of nodes in the
+ * remote cluster and recover the list of nodes upon restart. This allows the client to function
+ * if the remote Cluster Manager is unavailable, even after a restart of the client software.
+ * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager
+ * will result in not being able to communicate with the remote instance if a new client
+ * is created.
+ *
+ * @param peerPersistenceFile
+ * @return
+ */
+ public Builder peerPersistenceFile(final File peerPersistenceFile) {
+ this.peerPersistenceFile = peerPersistenceFile;
+ return this;
+ }
+
+ /**
+ * Specifies whether or not data should be compressed before being transferred to or from the
+ * remote instance.
+ *
+ * @param compress
+ * @return
+ */
+ public Builder useCompression(final boolean compress) {
+ this.useCompression = compress;
+ return this;
+ }
+
+ /**
+ * Specifies the name of the port to communicate with. Either the port name or the port identifier
+ * must be specified.
+ *
+ * @param portName
+ * @return
+ */
+ public Builder portName(final String portName) {
+ this.portName = portName;
+ return this;
+ }
+
+ /**
+ * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing
+ * the port name, as the port name may change.
+ *
+ * @param portIdentifier
+ * @return
+ */
+ public Builder portIdentifier(final String portIdentifier) {
+ this.portIdentifier = portIdentifier;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred number of {@link DataPacket}s to include in a Transaction.
+ *
+ * @return
+ */
+ public Builder requestBatchCount(final int count) {
+ this.batchCount = count;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred number of bytes to include in a Transaction.
+ *
+ * @return
+ */
+ public Builder requestBatchSize(final long bytes) {
+ this.batchSize = bytes;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred amount of time that a Transaction should span.
+ *
+ * @return
+ */
+ public Builder requestBatchDuration(final long value, final TimeUnit unit) {
+ this.batchNanos = unit.toNanos(value);
+ return this;
+ }
+
+
+ /**
+ * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
+ * @return
+ */
+ public SiteToSiteClient build() {
+ if ( url == null ) {
+ throw new IllegalStateException("Must specify URL to build Site-to-Site client");
+ }
+
+ if ( portName == null && portIdentifier == null ) {
+ throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+ }
+
+ final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+
+ @Override
+ public boolean isUseCompression() {
+ return Builder.this.isUseCompression();
+ }
+
+ @Override
+ public String getUrl() {
+ return Builder.this.getUrl();
+ }
+
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return Builder.this.getTimeout(timeUnit);
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return Builder.this.getSslContext();
+ }
+
+ @Override
+ public String getPortName() {
+ return Builder.this.getPortName();
+ }
+
+ @Override
+ public String getPortIdentifier() {
+ return Builder.this.getPortIdentifier();
+ }
+
+ @Override
+ public long getPenalizationPeriod(final TimeUnit timeUnit) {
+ return Builder.this.getPenalizationPeriod(timeUnit);
+ }
+
+ @Override
+ public File getPeerPersistenceFile() {
+ return Builder.this.getPeerPersistenceFile();
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ return Builder.this.getEventReporter();
+ }
+
+ @Override
+ public long getPreferredBatchDuration(final TimeUnit timeUnit) {
+ return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long getPreferredBatchSize() {
+ return Builder.this.batchSize;
+ }
+
+ @Override
+ public int getPreferredBatchCount() {
+ return Builder.this.batchCount;
+ }
+ };
+
+ return new SocketClient(config);
+ }
+
+ /**
+ * Returns the configured URL for the remote NiFi instance
+ * @return
+ */
+ public String getUrl() {
+ return url;
+ }
+
+ /**
+ * Returns the communications timeout in nanoseconds
+ * @return
+ */
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Returns the amount of time that a particular node will be ignored after a
+ * communications error with that node occurs
+ * @param timeUnit
+ * @return
+ */
+ public long getPenalizationPeriod(TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Returns the SSL Context that is configured for this builder
+ * @return
+ */
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ /**
+ * Returns the EventReporter that is to be used by clients to report events
+ * @return
+ */
+ public EventReporter getEventReporter() {
+ return eventReporter;
+ }
+
+ /**
+ * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+ * @return
+ */
+ public File getPeerPersistenceFile() {
+ return peerPersistenceFile;
+ }
+
+ /**
+ * Returns a boolean indicating whether or not compression will be used to transfer data
+ * to and from the remote instance
+ * @return
+ */
+ public boolean isUseCompression() {
+ return useCompression;
+ }
+
+ /**
+ * Returns the name of the port that the client is to communicate with.
+ * @return
+ */
+ public String getPortName() {
+ return portName;
+ }
+
+ /**
+ * Returns the identifier of the port that the client is to communicate with.
+ * @return
+ */
+ public String getPortIdentifier() {
+ return portIdentifier;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
new file mode 100644
index 0000000..37c48f8
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClientConfig {
+
+ /**
+ * Returns the configured URL for the remote NiFi instance
+ * @return
+ */
+ String getUrl();
+
+ /**
+ * Returns the communications timeout in nanoseconds
+ * @return
+ */
+ long getTimeout(final TimeUnit timeUnit);
+
+ /**
+ * Returns the amount of time that a particular node will be ignored after a
+ * communications error with that node occurs
+ * @param timeUnit
+ * @return
+ */
+ long getPenalizationPeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns the SSL Context that is configured for this builder
+ * @return
+ */
+ SSLContext getSslContext();
+
+ /**
+ * Returns the EventReporter that is to be used by clients to report events
+ * @return
+ */
+ EventReporter getEventReporter();
+
+ /**
+ * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+ * @return
+ */
+ File getPeerPersistenceFile();
+
+ /**
+ * Returns a boolean indicating whether or not compression will be used to transfer data
+ * to and from the remote instance
+ * @return
+ */
+ boolean isUseCompression();
+
+ /**
+ * Returns the name of the port that the client is to communicate with.
+ * @return
+ */
+ String getPortName();
+
+ /**
+ * Returns the identifier of the port that the client is to communicate with.
+ * @return
+ */
+ String getPortIdentifier();
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * amount of time that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @param timeUnit
+ * @return
+ */
+ long getPreferredBatchDuration(TimeUnit timeUnit);
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * number of bytes that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @return
+ */
+ long getPreferredBatchSize();
+
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @return
+ */
+ int getPreferredBatchCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
new file mode 100644
index 0000000..f4ac727
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+
+public class EndpointConnectionState {
+ private final Peer peer;
+ private final SocketClientProtocol socketClientProtocol;
+ private final FlowFileCodec codec;
+ private volatile long lastUsed;
+
+ public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
+ this.peer = peer;
+ this.socketClientProtocol = socketClientProtocol;
+ this.codec = codec;
+ }
+
+ public FlowFileCodec getCodec() {
+ return codec;
+ }
+
+ public SocketClientProtocol getSocketClientProtocol() {
+ return socketClientProtocol;
+ }
+
+ public Peer getPeer() {
+ return peer;
+ }
+
+ public void setLastTimeUsed() {
+ lastUsed = System.currentTimeMillis();
+ }
+
+ public long getLastTimeUsed() {
+ return lastUsed;
+ }
+}