You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 01:23:08 UTC
[5/5] incubator-nifi git commit: NIFI-282: Begin refactoring and
creating client
NIFI-282: Begin refactoring and creating client
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fdf75846
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fdf75846
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fdf75846
Branch: refs/heads/site-to-site-client
Commit: fdf75846002877f7f0c857ff2b18593c6f2d825d
Parents: f21b502
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Jan 18 19:22:27 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Jan 18 19:22:27 2015 -0500
----------------------------------------------------------------------
nifi/commons/pom.xml | 9 +-
nifi/commons/site-to-site-client/pom.xml | 31 +
.../remote/AbstractCommunicationsSession.java | 54 ++
.../main/java/org/apache/nifi/remote/Peer.java | 113 ++++
.../java/org/apache/nifi/remote/PeerStatus.java | 72 +++
.../nifi/remote/PortAuthorizationResult.java | 25 +
.../nifi/remote/RemoteAuthorizationState.java | 27 +
.../nifi/remote/RemoteResourceInitiator.java | 64 ++
.../apache/nifi/remote/TransferDirection.java | 23 +
.../nifi/remote/VersionedRemoteResource.java | 24 +
.../apache/nifi/remote/client/DataPacket.java | 28 +
.../nifi/remote/client/SiteToSiteClient.java | 27 +
.../client/socket/EndpointConnectionState.java | 54 ++
.../socket/EndpointConnectionStatePool.java | 648 +++++++++++++++++++
.../nifi/remote/client/socket/SocketClient.java | 37 ++
.../remote/cluster/AdaptedNodeInformation.java | 66 ++
.../remote/cluster/ClusterNodeInformation.java | 67 ++
.../nifi/remote/cluster/NodeInformant.java | 22 +
.../nifi/remote/cluster/NodeInformation.java | 98 +++
.../remote/cluster/NodeInformationAdapter.java | 41 ++
.../apache/nifi/remote/codec/FlowFileCodec.java | 79 +++
.../remote/codec/StandardFlowFileCodec.java | 169 +++++
.../remote/exception/BadRequestException.java | 30 +
.../remote/exception/HandshakeException.java | 30 +
.../exception/NotAuthorizedException.java | 26 +
.../exception/PortNotRunningException.java | 26 +
.../remote/exception/ProtocolException.java | 34 +
.../exception/RequestExpiredException.java | 26 +
.../remote/exception/UnknownPortException.java | 26 +
.../exception/UnsupportedCodecException.java | 31 +
.../SocketChannelCommunicationsSession.java | 90 +++
.../remote/io/socket/SocketChannelInput.java | 66 ++
.../remote/io/socket/SocketChannelOutput.java | 58 ++
.../SSLSocketChannelCommunicationsSession.java | 93 +++
.../io/socket/ssl/SSLSocketChannelInput.java | 50 ++
.../io/socket/ssl/SSLSocketChannelOutput.java | 44 ++
.../nifi/remote/protocol/ClientProtocol.java | 78 +++
.../remote/protocol/CommunicationsInput.java | 27 +
.../remote/protocol/CommunicationsOutput.java | 27 +
.../remote/protocol/CommunicationsSession.java | 64 ++
.../nifi/remote/protocol/RequestType.java | 43 ++
.../protocol/socket/HandshakeProperty.java | 23 +
.../nifi/remote/protocol/socket/Response.java | 51 ++
.../remote/protocol/socket/ResponseCode.java | 152 +++++
.../protocol/socket/SocketClientProtocol.java | 517 +++++++++++++++
.../nifi/remote/util/PeerStatusCache.java | 43 ++
.../nifi/cluster/manager/ClusterManager.java | 2 +-
.../cluster/manager/impl/WebClusterManager.java | 5 +-
.../framework-bundle/framework/core-api/pom.xml | 5 +
.../nifi/cluster/AdaptedNodeInformation.java | 66 --
.../nifi/cluster/ClusterNodeInformation.java | 67 --
.../org/apache/nifi/cluster/NodeInformant.java | 22 -
.../apache/nifi/cluster/NodeInformation.java | 98 ---
.../nifi/cluster/NodeInformationAdapter.java | 39 --
.../apache/nifi/groups/RemoteProcessGroup.java | 6 -
.../main/java/org/apache/nifi/remote/Peer.java | 107 ---
.../java/org/apache/nifi/remote/PeerStatus.java | 72 ---
.../nifi/remote/PortAuthorizationResult.java | 25 -
.../nifi/remote/RemoteAuthorizationState.java | 27 -
.../org/apache/nifi/remote/RemoteGroupPort.java | 22 +-
.../apache/nifi/remote/TransferDirection.java | 23 -
.../nifi/remote/VersionedRemoteResource.java | 24 -
.../apache/nifi/remote/codec/FlowFileCodec.java | 79 ---
.../remote/exception/BadRequestException.java | 30 -
.../remote/exception/HandshakeException.java | 30 -
.../exception/NotAuthorizedException.java | 26 -
.../exception/PortNotRunningException.java | 26 -
.../remote/exception/ProtocolException.java | 34 -
.../exception/RequestExpiredException.java | 26 -
.../remote/exception/UnknownPortException.java | 26 -
.../nifi/remote/protocol/ClientProtocol.java | 78 ---
.../remote/protocol/CommunicationsInput.java | 27 -
.../remote/protocol/CommunicationsOutput.java | 27 -
.../remote/protocol/CommunicationsSession.java | 64 --
.../nifi/remote/protocol/RequestType.java | 43 --
.../nifi/remote/protocol/ServerProtocol.java | 2 +-
.../nifi/remote/StandardRemoteProcessGroup.java | 245 +------
.../framework/site-to-site/pom.xml | 4 +
.../remote/AbstractCommunicationsSession.java | 54 --
.../nifi/remote/RemoteResourceFactory.java | 42 +-
.../nifi/remote/SocketRemoteSiteListener.java | 2 +-
.../nifi/remote/StandardRemoteGroupPort.java | 498 +++-----------
.../remote/codec/StandardFlowFileCodec.java | 169 -----
.../exception/UnsupportedCodecException.java | 31 -
.../SocketChannelCommunicationsSession.java | 90 ---
.../remote/io/socket/SocketChannelInput.java | 66 --
.../remote/io/socket/SocketChannelOutput.java | 58 --
.../SSLSocketChannelCommunicationsSession.java | 93 ---
.../io/socket/ssl/SSLSocketChannelInput.java | 50 --
.../io/socket/ssl/SSLSocketChannelOutput.java | 44 --
.../socket/ClusterManagerServerProtocol.java | 7 +-
.../protocol/socket/HandshakeProperty.java | 23 -
.../nifi/remote/protocol/socket/Response.java | 51 --
.../remote/protocol/socket/ResponseCode.java | 152 -----
.../protocol/socket/SocketClientProtocol.java | 510 ---------------
.../socket/SocketFlowFileServerProtocol.java | 3 +-
.../remote/TestStandardRemoteGroupPort.java | 6 +-
nifi/nar-bundles/framework-bundle/pom.xml | 5 +
.../apache/nifi/remote/RemoteDestination.java | 37 ++
99 files changed, 3606 insertions(+), 3195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/pom.xml b/nifi/commons/pom.xml
index f85e337..43dc0d8 100644
--- a/nifi/commons/pom.xml
+++ b/nifi/commons/pom.xml
@@ -12,9 +12,7 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -39,6 +37,7 @@
<module>nifi-utils</module>
<module>nifi-web-utils</module>
<module>processor-utilities</module>
+ <module>site-to-site-client</module>
<module>wali</module>
- </modules>
-</project>
+ </modules>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/pom.xml b/nifi/commons/site-to-site-client/pom.xml
new file mode 100644
index 0000000..7719d55
--- /dev/null
+++ b/nifi/commons/site-to-site-client/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-commons-parent</artifactId>
+ <version>0.0.1-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>site-to-site-client</artifactId>
+ <name>NiFi Site-to-Site Client</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
new file mode 100644
index 0000000..4babb92
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public abstract class AbstractCommunicationsSession implements CommunicationsSession {
+ private String userDn;
+
+ private volatile String uri;
+
+ public AbstractCommunicationsSession(final String uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public String toString() {
+ return uri;
+ }
+
+ @Override
+ public void setUri(final String uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public String getUri() {
+ return uri;
+ }
+
+ @Override
+ public String getUserDn() {
+ return userDn;
+ }
+
+ @Override
+ public void setUserDn(final String dn) {
+ this.userDn = dn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..e811c68
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public class Peer {
+
+ private final CommunicationsSession commsSession;
+ private final String url;
+ private final String clusterUrl;
+ private final String host;
+ private long penalizationExpiration = 0L;
+ private boolean closed = false;
+
+ public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
+ this.commsSession = commsSession;
+ this.url = peerUrl;
+ this.clusterUrl = clusterUrl;
+
+ try {
+ this.host = new URI(peerUrl).getHost();
+ } catch (final Exception e) {
+ throw new IllegalArgumentException("Invalid URL: " + peerUrl);
+ }
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getClusterUrl() {
+ return clusterUrl;
+ }
+
+ public CommunicationsSession getCommunicationsSession() {
+ return commsSession;
+ }
+
+ public void close() throws IOException {
+ this.closed = true;
+
+ // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+ commsSession.close();
+ }
+
+ public void penalize(final long millis) {
+ penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
+ }
+
+ public boolean isPenalized() {
+ return penalizationExpiration > System.currentTimeMillis();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int hashCode() {
+ return 8320 + url.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Peer)) {
+ return false;
+ }
+
+ final Peer other = (Peer) obj;
+ return this.url.equals(other.url);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Peer[url=").append(url);
+ if (closed) {
+ sb.append(",CLOSED");
+ } else if (isPenalized()) {
+ sb.append(",PENALIZED");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..d1cb076
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+ private final String hostname;
+ private final int port;
+ private final boolean secure;
+ private final int numFlowFiles;
+
+ public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
+ this.hostname = hostname;
+ this.port = port;
+ this.secure = secure;
+ this.numFlowFiles = numFlowFiles;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ public int getFlowFileCount() {
+ return numFlowFiles;
+ }
+
+ @Override
+ public String toString() {
+ return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 9824372 + hostname.hashCode() + port;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof PeerStatus)) {
+ return false;
+ }
+
+ final PeerStatus other = (PeerStatus) obj;
+ return port == other.port && hostname.equals(other.hostname);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
new file mode 100644
index 0000000..8f2603a
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface PortAuthorizationResult {
+
+ boolean isAuthorized();
+
+ String getExplanation();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
new file mode 100644
index 0000000..12a3d33
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ *
+ */
+public enum RemoteAuthorizationState {
+
+ UNKNOWN,
+ UNAUTHORIZED,
+ AUTHORIZED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
new file mode 100644
index 0000000..8eb5d8d
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.HandshakeException;
+
+public class RemoteResourceInitiator {
+ public static final int RESOURCE_OK = 20;
+ public static final int DIFFERENT_RESOURCE_VERSION = 21;
+ public static final int ABORT = 255;
+
+
+ public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ // Write the classname of the RemoteStreamCodec, followed by its version
+ dos.writeUTF(resource.getResourceName());
+ final VersionNegotiator negotiator = resource.getVersionNegotiator();
+ dos.writeInt(negotiator.getVersion());
+ dos.flush();
+
+ // wait for response from server.
+ final int statusCode = dis.read();
+ switch (statusCode) {
+ case RESOURCE_OK: // server accepted our proposal of codec name/version
+ return resource;
+ case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ // Get server's preferred version
+ final int newVersion = dis.readInt();
+
+ // Determine our new preferred version that is no greater than the server's preferred version.
+ final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+ // If we could not agree with server on a version, fail now.
+ if ( newPreference == null ) {
+ throw new HandshakeException("Could not agree on version for " + resource);
+ }
+
+ negotiator.setVersion(newPreference);
+
+ // Attempt negotiation of resource based on our new preferred version.
+ return initiateResourceNegotiation(resource, dis, dos);
+ case ABORT:
+ throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+ default:
+ return null; // Unable to negotiate codec
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..56432d5
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public enum TransferDirection {
+
+ SEND,
+ RECEIVE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+ VersionNegotiator getVersionNegotiator();
+
+ String getResourceName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
new file mode 100644
index 0000000..ec77f2c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+ Map<String, String> getAttributes();
+
+ InputStream getData();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
new file mode 100644
index 0000000..47a09be
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.IOException;
+
+public interface SiteToSiteClient {
+
+ void send(DataPacket dataPacket) throws IOException;
+
+ DataPacket receive() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
new file mode 100644
index 0000000..f4ac727
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+
+public class EndpointConnectionState {
+ private final Peer peer;
+ private final SocketClientProtocol socketClientProtocol;
+ private final FlowFileCodec codec;
+ private volatile long lastUsed;
+
+ public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
+ this.peer = peer;
+ this.socketClientProtocol = socketClientProtocol;
+ this.codec = codec;
+ }
+
+ public FlowFileCodec getCodec() {
+ return codec;
+ }
+
+ public SocketClientProtocol getSocketClientProtocol() {
+ return socketClientProtocol;
+ }
+
+ public Peer getPeer() {
+ return peer;
+ }
+
+ public void setLastTimeUsed() {
+ lastUsed = System.currentTimeMillis();
+ }
+
+ public long getLastTimeUsed() {
+ return lastUsed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
new file mode 100644
index 0000000..2dd489d
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.SSLContext;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EndpointConnectionStatePool {
+ public static final long PEER_REFRESH_PERIOD = 60000L;
+ public static final String CATEGORY = "Site-to-Site";
+ private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+
+ private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class);
+
+ private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
+
+ private final AtomicLong peerIndex = new AtomicLong(0L);
+
+ private final ReentrantLock peerRefreshLock = new ReentrantLock();
+ private volatile List<PeerStatus> peerStatuses;
+ private volatile long peerRefreshTime = 0L;
+ private volatile PeerStatusCache peerStatusCache;
+ private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
+
+ private final File peersFile;
+ private final EventReporter eventReporter;
+ private final SSLContext sslContext;
+
+ public EndpointConnectionStatePool(final EventReporter eventReporter, final File persistenceFile) {
+ this(null, eventReporter, persistenceFile);
+ }
+
+ public EndpointConnectionStatePool(final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+ this.sslContext = sslContext;
+ this.peersFile = persistenceFile;
+ this.eventReporter = eventReporter;
+
+ Set<PeerStatus> recoveredStatuses;
+ if ( persistenceFile != null && persistenceFile.exists() ) {
+ try {
+ recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
+ this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified());
+ } catch (final IOException ioe) {
+ logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
+ }
+ } else {
+ peerStatusCache = null;
+ }
+ }
+
+ public EndpointConnectionState getEndpointConnectionState(final String clusterUrl, final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ //
+ // Attempt to get a connection state that already exists for this URL.
+ //
+ BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(clusterUrl);
+ if ( connectionStateQueue == null ) {
+ connectionStateQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue);
+ if ( existingQueue != null ) {
+ connectionStateQueue = existingQueue;
+ }
+ }
+
+ FlowFileCodec codec = null;
+ CommunicationsSession commsSession = null;
+ SocketClientProtocol protocol = null;
+ EndpointConnectionState connectionState;
+ Peer peer = null;
+
+ do {
+ final PeerStatus peerStatus = getNextPeerStatus(direction);
+ if ( peerStatus == null ) {
+ return null;
+ }
+
+ connectionState = connectionStateQueue.poll();
+ logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState);
+
+ // if we can't get an existing ConnectionState, create one
+ if ( connectionState == null ) {
+ protocol = new SocketClientProtocol();
+ protocol.setDestination(remoteDestination);
+
+ try {
+ commsSession = establishSiteToSiteConnection(peerStatus);
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ try {
+ RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
+ } catch (final HandshakeException e) {
+ try {
+ commsSession.close();
+ } catch (final IOException ioe) {
+ throw e;
+ }
+ }
+ } catch (final IOException e) {
+ }
+
+
+ final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
+ peer = new Peer(commsSession, peerUrl, clusterUrl);
+
+ // perform handshake
+ try {
+ protocol.handshake(peer);
+
+ // handle error cases
+ if ( protocol.isDestinationFull() ) {
+ logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ connectionStateQueue.offer(connectionState);
+ continue;
+ } else if ( protocol.isPortInvalid() ) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+ throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running");
+ } else if ( protocol.isPortUnknown() ) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+ throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known");
+ }
+
+ // negotiate the FlowFileCodec to use
+ codec = protocol.negotiateCodec(peer);
+ } catch (final PortNotRunningException | UnknownPortException e) {
+ throw e;
+ } catch (final Exception e) {
+ penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ cleanup(protocol, peer);
+
+ final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
+ logger.error(message);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ throw e;
+ }
+
+ connectionState = new EndpointConnectionState(peer, protocol, codec);
+ } else {
+ final long lastTimeUsed = connectionState.getLastTimeUsed();
+ final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
+ final long timeoutMillis = remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
+
+ if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) {
+ cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
+ connectionState = null;
+ } else {
+ codec = connectionState.getCodec();
+ peer = connectionState.getPeer();
+ commsSession = peer.getCommunicationsSession();
+ protocol = connectionState.getSocketClientProtocol();
+ }
+ }
+ } while ( connectionState == null || codec == null || commsSession == null || protocol == null );
+
+ return connectionState;
+ }
+
+
+ public boolean offer(final EndpointConnectionState endpointConnectionState) {
+ final Peer peer = endpointConnectionState.getPeer();
+ if ( peer == null ) {
+ return false;
+ }
+
+ final String url = peer.getUrl();
+ if ( url == null ) {
+ return false;
+ }
+
+ final BlockingQueue<EndpointConnectionState> queue = endpointConnectionMap.get(url);
+ if ( queue == null ) {
+ return false;
+ }
+
+ return queue.offer(endpointConnectionState);
+ }
+
+ /**
+ * Updates internal state map to penalize a PeerStatus that points to the specified peer
+ * @param peer
+ */
+ public void penalize(final Peer peer, final long penalizationMillis) {
+ String host;
+ int port;
+ try {
+ final URI uri = new URI(peer.getUrl());
+ host = uri.getHost();
+ port = uri.getPort();
+ } catch (final URISyntaxException e) {
+ host = peer.getHost();
+ port = -1;
+ }
+
+ final PeerStatus status = new PeerStatus(host, port, true, 1);
+ Long expiration = peerTimeoutExpirations.get(status);
+ if ( expiration == null ) {
+ expiration = Long.valueOf(0L);
+ }
+
+ final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+ peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
+ }
+
+ private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
+ if ( protocol != null && peer != null ) {
+ try {
+ protocol.shutdown(peer);
+ } catch (final TransmissionDisabledException e) {
+ // User disabled transmission.... do nothing.
+ logger.debug(this + " Transmission Disabled by User");
+ } catch (IOException e1) {
+ }
+ }
+
+ if ( peer != null ) {
+ try {
+ peer.close();
+ } catch (final TransmissionDisabledException e) {
+ // User disabled transmission.... do nothing.
+ logger.debug(this + " Transmission Disabled by User");
+ } catch (IOException e1) {
+ }
+ }
+ }
+
+ private PeerStatus getNextPeerStatus(final TransferDirection direction) {
+ List<PeerStatus> peerList = peerStatuses;
+ if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) {
+ try {
+ try {
+ peerList = createPeerStatusList(direction);
+ } catch (final Exception e) {
+ final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
+ eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+ }
+
+ this.peerStatuses = peerList;
+ peerRefreshTime = System.currentTimeMillis();
+ } finally {
+ peerRefreshLock.unlock();
+ }
+ }
+
+ if ( peerList == null || peerList.isEmpty() ) {
+ return null;
+ }
+
+ PeerStatus peerStatus;
+ for (int i=0; i < peerList.size(); i++) {
+ final long idx = peerIndex.getAndIncrement();
+ final int listIndex = (int) (idx % peerList.size());
+ peerStatus = peerList.get(listIndex);
+
+ if ( isPenalized(peerStatus) ) {
+ logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
+ } else {
+ return peerStatus;
+ }
+ }
+
+ logger.debug("{} All peers appear to be penalized; returning null", this);
+ return null;
+ }
+
+ private boolean isPenalized(final PeerStatus peerStatus) {
+ final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
+ return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
+ }
+
+ private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, BadRequestException, HandshakeException, UnknownPortException, PortNotRunningException {
+ final Set<PeerStatus> statuses = getPeerStatuses();
+ if ( statuses == null ) {
+ return new ArrayList<>();
+ }
+
+ final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
+ final List<NodeInformation> nodeInfos = new ArrayList<>();
+ for ( final PeerStatus peerStatus : statuses ) {
+ final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
+ nodeInfos.add(nodeInfo);
+ }
+ clusterNodeInfo.setNodeInformation(nodeInfos);
+ return formulateDestinationList(clusterNodeInfo, direction);
+ }
+
+
+ public Set<PeerStatus> getPeerStatuses() {
+ final PeerStatusCache cache = this.peerStatusCache;
+ if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
+ return null;
+ }
+
+ if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
+ final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
+ for (final PeerStatus status : cache.getStatuses()) {
+ final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
+ equalizedSet.add(equalizedStatus);
+ }
+
+ return equalizedSet;
+ }
+
+ return cache.getStatuses();
+ }
+
+ private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, final boolean secure) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
+ final String hostname = destinationUri.getHost();
+ final int port = destinationUri.getPort();
+
+ final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port, secure);
+ final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, destinationUri.toString());
+ final SocketClientProtocol clientProtocol = new SocketClientProtocol();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ try {
+ RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos);
+ } catch (final HandshakeException e) {
+ throw new BadRequestException(e.toString());
+ }
+
+ // TODO: Make the 30000 millis configurable
+ clientProtocol.handshake(peer, null, 30000);
+ final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
+ persistPeerStatuses(peerStatuses);
+
+ try {
+ clientProtocol.shutdown(peer);
+ } catch (final IOException e) {
+ final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ try {
+ peer.close();
+ } catch (final IOException e) {
+ final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ return peerStatuses;
+ }
+
+
+ private void persistPeerStatuses(final Set<PeerStatus> statuses) {
+ if ( peersFile == null ) {
+ return;
+ }
+
+ try (final OutputStream fos = new FileOutputStream(peersFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+
+ for (final PeerStatus status : statuses) {
+ final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
+ out.write(line.getBytes(StandardCharsets.UTF_8));
+ }
+
+ } catch (final IOException e) {
+ logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
+ }
+ }
+
+ private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
+ if (!file.exists()) {
+ return null;
+ }
+
+ final Set<PeerStatus> statuses = new HashSet<>();
+ try (final InputStream fis = new FileInputStream(file);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String[] splits = line.split(Pattern.quote(":"));
+ if (splits.length != 3) {
+ continue;
+ }
+
+ final String hostname = splits[0];
+ final int port = Integer.parseInt(splits[1]);
+ final boolean secure = Boolean.parseBoolean(splits[2]);
+
+ statuses.add(new PeerStatus(hostname, port, secure, 1));
+ }
+ }
+
+ return statuses;
+ }
+
+
+ public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
+ return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort(), peerStatus.isSecure());
+ }
+
+ public CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port, final boolean secure) throws IOException {
+ final String destinationUri = "nifi://" + hostname + ":" + port;
+
+ CommunicationsSession commsSession = null;
+ try {
+ if ( secure ) {
+ if ( sslContext == null ) {
+ throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
+ }
+
+ final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+ socketChannel.connect();
+
+ commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+
+ try {
+ commsSession.setUserDn(socketChannel.getDn());
+ } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
+ }
+
+ commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+
+ commsSession.setUri(destinationUri);
+ } catch (final IOException ioe) {
+ if ( commsSession != null ) {
+ commsSession.close();
+ }
+
+ throw ioe;
+ }
+
+ return commsSession;
+ }
+
+
+// private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException {
+// return formulateDestinationList(clusterNodeInfo, getConnectableType());
+// }
+
+ static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
+ final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
+ final int numDestinations = Math.max(128, nodeInfoSet.size());
+ final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
+
+ long totalFlowFileCount = 0L;
+ for (final NodeInformation nodeInfo : nodeInfoSet) {
+ totalFlowFileCount += nodeInfo.getTotalFlowFiles();
+ }
+
+ int totalEntries = 0;
+ for (final NodeInformation nodeInfo : nodeInfoSet) {
+ final int flowFileCount = nodeInfo.getTotalFlowFiles();
+ // don't allow any node to get more than 80% of the data
+ final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
+ final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
+ final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
+
+ entryCountMap.put(nodeInfo, Math.max(1, entries));
+ totalEntries += entries;
+ }
+
+ final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
+ for (int i=0; i < totalEntries; i++) {
+ destinations.add(null);
+ }
+ for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ final NodeInformation nodeInfo = entry.getKey();
+ final int numEntries = entry.getValue();
+
+ int skipIndex = numEntries;
+ for (int i=0; i < numEntries; i++) {
+ int n = (skipIndex * i);
+ while (true) {
+ final int index = n % destinations.size();
+ PeerStatus status = destinations.get(index);
+ if ( status == null ) {
+ status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
+ destinations.set(index, status);
+ break;
+ } else {
+ n++;
+ }
+ }
+ }
+ }
+
+ final StringBuilder distributionDescription = new StringBuilder();
+ distributionDescription.append("New Weighted Distribution of Nodes:");
+ for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
+ final double percentage = entry.getValue() * 100D / (double) destinations.size();
+ distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
+ }
+ logger.info(distributionDescription.toString());
+
+ // Jumble the list of destinations.
+ return destinations;
+ }
+
+
+ public void cleanupExpiredSockets() {
+ final List<EndpointConnectionState> states = new ArrayList<>();
+
+ for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
+ states.clear();
+
+ EndpointConnectionState state;
+ while ((state = queue.poll()) != null) {
+ // If the socket has not been used in 10 seconds, shut it down.
+ final long lastUsed = state.getLastTimeUsed();
+ if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+ try {
+ state.getSocketClientProtocol().shutdown(state.getPeer());
+ } catch (final Exception e) {
+ logger.debug("Failed to shut down {} using {} due to {}",
+ new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
+ }
+
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ } else {
+ states.add(state);
+ }
+ }
+
+ queue.addAll(states);
+ }
+ }
+
+ public void shutdown() {
+ peerTimeoutExpirations.clear();
+
+ for ( final CommunicationsSession commsSession : activeCommsChannels ) {
+ commsSession.interrupt();
+ }
+
+ for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
+ EndpointConnectionState state;
+ while ( (state = queue.poll()) != null) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ }
+ }
+
+ endpointConnectionMap.clear();
+ }
+
+ public void refreshPeers(final URI targetUri, final boolean secure) {
+ final PeerStatusCache existingCache = peerStatusCache;
+ if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
+ return;
+ }
+
+ try {
+ final Set<PeerStatus> statuses = fetchRemotePeerStatuses(targetUri, secure);
+ peerStatusCache = new PeerStatusCache(statuses);
+ logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
+ } catch (Exception e) {
+ logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
new file mode 100644
index 0000000..48e9cc5
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.client.DataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+public class SocketClient implements SiteToSiteClient {
+
+ @Override
+ public void send(final DataPacket dataPacket) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public DataPacket receive() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
new file mode 100644
index 0000000..6ca5812
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public class AdaptedNodeInformation {
+
+ private String hostname;
+ private Integer siteToSitePort;
+ private int apiPort;
+ private boolean isSiteToSiteSecure;
+ private int totalFlowFiles;
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public Integer getSiteToSitePort() {
+ return siteToSitePort;
+ }
+
+ public void setSiteToSitePort(Integer siteToSitePort) {
+ this.siteToSitePort = siteToSitePort;
+ }
+
+ public int getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(int apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ public boolean isSiteToSiteSecure() {
+ return isSiteToSiteSecure;
+ }
+
+ public void setSiteToSiteSecure(boolean isSiteToSiteSecure) {
+ this.isSiteToSiteSecure = isSiteToSiteSecure;
+ }
+
+ public int getTotalFlowFiles() {
+ return totalFlowFiles;
+ }
+
+ public void setTotalFlowFiles(int totalFlowFiles) {
+ this.totalFlowFiles = totalFlowFiles;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java
new file mode 100644
index 0000000..1bc83b9
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+@XmlRootElement
+public class ClusterNodeInformation {
+
+ private Collection<NodeInformation> nodeInfo;
+
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.", e);
+ }
+ }
+
+ public ClusterNodeInformation() {
+ this.nodeInfo = null;
+ }
+
+ public void setNodeInformation(final Collection<NodeInformation> nodeInfo) {
+ this.nodeInfo = nodeInfo;
+ }
+
+ @XmlJavaTypeAdapter(NodeInformationAdapter.class)
+ public Collection<NodeInformation> getNodeInformation() {
+ return nodeInfo;
+ }
+
+ public void marshal(final OutputStream os) throws JAXBException {
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(this, os);
+ }
+
+ public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ return (ClusterNodeInformation) unmarshaller.unmarshal(is);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
new file mode 100644
index 0000000..e46ff5c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public interface NodeInformant {
+
+ ClusterNodeInformation getNodeInformation();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
new file mode 100644
index 0000000..2041268
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public class NodeInformation {
+
+ private final String hostname;
+ private final Integer siteToSitePort;
+ private final int apiPort;
+ private final boolean isSiteToSiteSecure;
+ private final int totalFlowFiles;
+
+ public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort,
+ final boolean isSiteToSiteSecure, final int totalFlowFiles) {
+ this.hostname = hostname;
+ this.siteToSitePort = siteToSitePort;
+ this.apiPort = apiPort;
+ this.isSiteToSiteSecure = isSiteToSiteSecure;
+ this.totalFlowFiles = totalFlowFiles;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getAPIPort() {
+ return apiPort;
+ }
+
+ public Integer getSiteToSitePort() {
+ return siteToSitePort;
+ }
+
+ public boolean isSiteToSiteSecure() {
+ return isSiteToSiteSecure;
+ }
+
+ public int getTotalFlowFiles() {
+ return totalFlowFiles;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof NodeInformation)) {
+ return false;
+ }
+
+ final NodeInformation other = (NodeInformation) obj;
+ if (!hostname.equals(other.hostname)) {
+ return false;
+ }
+ if (siteToSitePort == null && other.siteToSitePort != null) {
+ return false;
+ }
+ if (siteToSitePort != null && other.siteToSitePort == null) {
+ return false;
+ } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) {
+ return false;
+ }
+ if (apiPort != other.apiPort) {
+ return false;
+ }
+ if (isSiteToSiteSecure != other.isSiteToSiteSecure) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
+ }
+
+ @Override
+ public String toString() {
+ return "Node[" + hostname + ":" + apiPort + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
new file mode 100644
index 0000000..440463c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+import org.apache.nifi.remote.cluster.NodeInformation;
+
+public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> {
+
+ @Override
+ public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception {
+ return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles());
+ }
+
+ @Override
+ public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception {
+ final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
+ adapted.setHostname(nodeInformation.getHostname());
+ adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
+ adapted.setApiPort(nodeInformation.getAPIPort());
+ adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());
+ adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles());
+ return adapted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
new file mode 100644
index 0000000..b4206b3
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+/**
+ * <p>
+ * Provides a mechanism for encoding and decoding FlowFiles as streams so that
+ * they can be transferred remotely.
+ * </p>
+ */
+public interface FlowFileCodec extends VersionedRemoteResource {
+
+ /**
+ * Returns a List of all versions that this codec is able to support, in the
+ * order that they are preferred by the codec
+ *
+ * @return
+ */
+ public List<Integer> getSupportedVersions();
+
+ /**
+ * Encodes a FlowFile and its content as a single stream of data and writes
+ * that stream to the output. If checksum is not null, it will be calculated
+ * as the stream is read
+ *
+ * @param flowFile the FlowFile to encode
+ * @param session a session that can be used to transactionally create and
+ * transfer flow files
+ * @param outStream the stream to write the data to
+ *
+ * @return the updated FlowFile
+ *
+ * @throws IOException
+ */
+ FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
+
+ /**
+ * Decodes the contents of the InputStream, interpreting the data to
+ * determine the next FlowFile's attributes and content, as well as their
+ * destinations. If not null, checksum will be used to calculate the
+ * checksum as the data is read.
+ *
+ * @param stream an InputStream containing FlowFiles' contents, attributes,
+ * and destinations
+ * @param session
+ *
+ * @return the FlowFile that was created, or <code>null</code> if the stream
+ * was out of data
+ *
+ * @throws IOException
+ * @throws ProtocolException if the input is malformed
+ */
+ FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+}