You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/15 20:14:44 UTC

[1/3] nifi git commit: NIFI-1992: - Updated site-to-site client and server to support clustered nifi instances NIFI-2274: - Ensuring we use the correct URI when updating a connection.

Repository: nifi
Updated Branches:
  refs/heads/master b67c9b6f7 -> c81dc1959


http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 0565f94..d16c626 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -16,11 +16,22 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationRequest;
@@ -31,6 +42,9 @@ import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
@@ -44,18 +58,11 @@ import org.apache.nifi.web.api.entity.PeersEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.ArrayList;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a SiteToSite connection.
@@ -70,7 +77,11 @@ public class SiteToSiteResource extends ApplicationResource {
     private static final Logger logger = LoggerFactory.getLogger(SiteToSiteResource.class);
 
     private NiFiServiceFacade serviceFacade;
+    private ClusterCoordinator clusterCoordinator;
     private Authorizer authorizer;
+    public static final String CHECK_SUM = "checksum";
+    public static final String RESPONSE_CODE = "responseCode";
+
     private final ResponseCreator responseCreator = new ResponseCreator();
     private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
     private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
@@ -147,6 +158,7 @@ public class SiteToSiteResource extends ApplicationResource {
         return clusterContext(noCache(Response.ok(entity))).build();
     }
 
+
     /**
      * Returns the available Peers and its status of this NiFi.
      *
@@ -185,52 +197,42 @@ public class SiteToSiteResource extends ApplicationResource {
             return responseCreator.badRequestResponse(e);
         }
 
-        ArrayList<PeerDTO> peers;
-
+        final List<PeerDTO> peers = new ArrayList<>();
         if (properties.isNode()) {
-            return responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is only accessible on NCM or Standalone NiFi instance.");
-        // TODO: NCM no longer exists.
-        /*
-        } else if (properties.isClusterManager()) {
-            ClusterNodeInformation clusterNodeInfo = clusterManager.getNodeInformation();
-            final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
-            peers = new ArrayList<>(nodeInfos.size());
-            for (NodeInformation nodeInfo : nodeInfos) {
-                if (nodeInfo.getSiteToSiteHttpApiPort() == null) {
-                    continue;
-                }
-                PeerDTO peer = new PeerDTO();
-                peer.setHostname(nodeInfo.getSiteToSiteHostname());
-                peer.setPort(nodeInfo.getSiteToSiteHttpApiPort());
-                peer.setSecure(nodeInfo.isSiteToSiteSecure());
-                peer.setFlowFileCount(nodeInfo.getTotalFlowFiles());
+            final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
+
+            // TODO: Get total number of FlowFiles for each node
+            for (final NodeIdentifier nodeId : nodeIds) {
+                final PeerDTO peer = new PeerDTO();
+                final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
+                peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
+                peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
+                peer.setSecure(nodeId.isSiteToSiteSecure());
+                peer.setFlowFileCount(0);
                 peers.add(peer);
             }
-        */
         } else {
             // Standalone mode.
-            PeerDTO peer = new PeerDTO();
+            final PeerDTO peer = new PeerDTO();
             // req.getLocalName returns private IP address, that can't be accessed from client in some environments.
             // So, use the value defined in nifi.properties instead when it is defined.
-            String remoteInputHost = properties.getRemoteInputHost();
+            final String remoteInputHost = properties.getRemoteInputHost();
             peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost);
             peer.setPort(properties.getRemoteInputHttpPort());
             peer.setSecure(properties.isSiteToSiteSecure());
             peer.setFlowFileCount(0);  // doesn't matter how many FlowFiles we have, because we're the only host.
 
-            peers = new ArrayList<>(1);
             peers.add(peer);
-
         }
 
-        PeersEntity entity = new PeersEntity();
+        final PeersEntity entity = new PeersEntity();
         entity.setPeers(peers);
 
         return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
     }
 
     // setters
-    public void setServiceFacade(NiFiServiceFacade serviceFacade) {
+    public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }
 
@@ -238,4 +240,9 @@ public class SiteToSiteResource extends ApplicationResource {
         this.authorizer = authorizer;
     }
 
+    @Override
+    public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
+        super.setClusterCoordinator(clusterCoordinator);
+        this.clusterCoordinator = clusterCoordinator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 62eca51..4161657 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -678,39 +678,6 @@ public class ControllerFacade implements Authorizable {
     }
 
     /**
-     * Returns the socket port that the Cluster Manager is listening on for
-     * Site-to-Site communications
-     *
-     * @return the socket port that the Cluster Manager is listening on for
-     *         Site-to-Site communications
-     */
-    public Integer getClusterManagerRemoteSiteListeningPort() {
-        return flowController.getClusterManagerRemoteSiteListeningPort();
-    }
-
-    /**
-     * Returns the http(s) port that the Cluster Manager is listening on for
-     * Site-to-Site communications
-     *
-     * @return the socket port that the Cluster Manager is listening on for
-     *         Site-to-Site communications
-     */
-    public Integer getClusterManagerRemoteSiteListeningHttpPort() {
-        return flowController.getClusterManagerRemoteSiteListeningHttpPort();
-    }
-
-    /**
-     * Indicates whether or not Site-to-Site communications with the Cluster
-     * Manager are secure
-     *
-     * @return whether or not Site-to-Site communications with the Cluster
-     *         Manager are secure
-     */
-    public Boolean isClusterManagerRemoteSiteCommsSecure() {
-        return flowController.isClusterManagerRemoteSiteCommsSecure();
-    }
-
-    /**
      * Returns the socket port that the local instance is listening on for
      * Site-to-Site communications
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js
index c1fc956..8522d5a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js
@@ -1334,7 +1334,7 @@ nf.ConnectionConfiguration = (function () {
                     });
 
                     // store the connection details
-                    $('#connection-uri').val(connection.uri);
+                    $('#connection-uri').val(connectionEntry.uri);
 
                     // configure the button model
                     $('#connection-configuration').modal('setButtonModel', [{


[3/3] nifi git commit: NIFI-1992: - Updated site-to-site client and server to support clustered nifi instances NIFI-2274: - Ensuring we use the correct URI when updating a connection.

Posted by mc...@apache.org.
NIFI-1992:
- Updated site-to-site client and server to support clustered nifi instances
NIFI-2274:
- Ensuring we use the correct URI when updating a connection.

This closes #530


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c81dc195
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c81dc195
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c81dc195

Branch: refs/heads/master
Commit: c81dc1959aa48910f772b41e09b86efb15c3be49
Parents: b67c9b6
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jun 14 14:47:24 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jul 15 16:13:59 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/remote/PeerStatus.java |  11 +-
 .../apache/nifi/remote/client/PeerSelector.java |  12 +-
 .../nifi/remote/client/http/HttpClient.java     |  44 +--
 .../client/socket/EndpointConnectionPool.java   | 112 ++++---
 .../http/HttpServerCommunicationsSession.java   |   9 +-
 .../protocol/socket/SocketClientProtocol.java   |   7 +-
 .../remote/util/SiteToSiteRestApiClient.java    | 298 ++++++++++---------
 .../nifi/remote/client/TestPeerSelector.java    |  28 +-
 .../cluster/protocol/ConnectionResponse.java    |  27 +-
 .../jaxb/message/AdaptedConnectionResponse.java |  27 --
 .../jaxb/message/ConnectionResponseAdapter.java |   4 -
 .../message/NodeStatusChangeMessage.java        |   4 -
 .../message/ReconnectionRequestMessage.java     |  27 --
 .../jaxb/message/TestJaxbProtocolUtils.java     |   2 +-
 .../node/NodeClusterCoordinator.java            |   5 +-
 .../nifi/remote/protocol/ServerProtocol.java    |   6 +-
 .../ClusterCoordinatorNodeInformant.java        |  55 ++++
 .../apache/nifi/controller/FlowController.java  |  51 +---
 .../nifi/controller/StandardFlowService.java    |   6 +-
 .../nifi/remote/StandardRemoteProcessGroup.java |   4 +-
 .../nifi/spring/FlowControllerFactoryBean.java  |   5 +-
 .../src/main/resources/nifi-context.xml         |   1 +
 .../nifi/remote/HttpRemoteSiteListener.java     |  14 +-
 .../apache/nifi/remote/RemoteSiteListener.java  |   3 -
 .../nifi/remote/SocketRemoteSiteListener.java   |   6 +-
 .../http/HttpFlowFileServerProtocolImpl.java    | 223 --------------
 .../StandardHttpFlowFileServerProtocol.java     | 225 ++++++++++++++
 .../socket/ClusterManagerServerProtocol.java    | 209 -------------
 .../socket/SocketFlowFileServerProtocol.java    |  47 ++-
 ...g.apache.nifi.remote.protocol.ServerProtocol |   4 +-
 .../http/TestHttpFlowFileServerProtocol.java    |  68 ++---
 .../nifi/web/StandardNiFiServiceFacade.java     |  56 ++--
 .../nifi/web/api/DataTransferResource.java      |  38 ++-
 .../apache/nifi/web/api/SiteToSiteResource.java |  91 +++---
 .../nifi/web/controller/ControllerFacade.java   |  33 --
 .../js/nf/canvas/nf-connection-configuration.js |   2 +-
 36 files changed, 785 insertions(+), 979 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
index 6c8a4ec..3113076 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -20,10 +20,12 @@ public class PeerStatus {
 
     private final PeerDescription description;
     private final int numFlowFiles;
+    private final boolean queryForPeers;
 
-    public PeerStatus(final PeerDescription description, final int numFlowFiles) {
+    public PeerStatus(final PeerDescription description, final int numFlowFiles, final boolean queryForPeers) {
         this.description = description;
         this.numFlowFiles = numFlowFiles;
+        this.queryForPeers = queryForPeers;
     }
 
     public PeerDescription getPeerDescription() {
@@ -34,6 +36,13 @@ public class PeerStatus {
         return numFlowFiles;
     }
 
+    /**
+     * @return <code>true</code> if this node can be queried for its peers, <code>false</code> otherwise.
+     */
+    public boolean isQueryForPeers() {
+        return queryForPeers;
+    }
+
     @Override
     public String toString() {
         return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort()

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index b67e014..e452b0f 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -97,7 +97,7 @@ public class PeerSelector {
 
             for (final PeerStatus status : statuses) {
                 final PeerDescription description = status.getPeerDescription();
-                final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
+                final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
                 out.write(line.getBytes(StandardCharsets.UTF_8));
             }
 
@@ -120,7 +120,7 @@ public class PeerSelector {
             String line;
             while ((line = reader.readLine()) != null) {
                 final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3) {
+                if (splits.length != 3 && splits.length != 4) {
                     continue;
                 }
 
@@ -128,7 +128,9 @@ public class PeerSelector {
                 final int port = Integer.parseInt(splits[1]);
                 final boolean secure = Boolean.parseBoolean(splits[2]);
 
-                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
+                final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
+
+                statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
             }
         }
 
@@ -172,7 +174,7 @@ public class PeerSelector {
                     final int index = n % destinations.size();
                     PeerStatus status = destinations.get(index);
                     if (status == null) {
-                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount());
+                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
                         destinations.set(index, status);
                         break;
                     } else {
@@ -306,7 +308,7 @@ public class PeerSelector {
         if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
             final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
             for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
+                final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
                 equalizedSet.add(equalizedStatus);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 3312e88..4cc794b 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -93,45 +93,44 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
         final URI clusterUrl;
         try {
             clusterUrl = new URI(config.getUrl());
-        } catch (URISyntaxException e) {
+        } catch (final URISyntaxException e) {
             throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
         }
 
-        try (
-            SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())
-        ) {
-            String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
+        try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
+            final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
 
-            int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
+            final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
             apiClient.setConnectTimeoutMillis(timeoutMillis);
             apiClient.setReadTimeoutMillis(timeoutMillis);
-            Collection<PeerDTO> peers = apiClient.getPeers();
+
+            final Collection<PeerDTO> peers = apiClient.getPeers();
             if(peers == null || peers.size() == 0){
                 throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers.");
             }
 
-            return peers.stream()
-                    .map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount()))
+            // Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
+            // was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
+            return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
                     .collect(Collectors.toSet());
         }
     }
 
     @Override
-    public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
-
-        int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
+    public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
+        final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
 
         PeerStatus peerStatus;
         while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) {
             logger.debug("peerStatus={}", peerStatus);
 
-            CommunicationsSession commSession = new HttpCommunicationsSession();
-            String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
+            final CommunicationsSession commSession = new HttpCommunicationsSession();
+            final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
             commSession.setUri(nodeApiUrl);
-            String clusterUrl = config.getUrl();
-            Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
+            final String clusterUrl = config.getUrl();
+            final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
 
-            int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
+            final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
             String portId = config.getPortIdentifier();
             if (StringUtils.isEmpty(portId)) {
                 portId = siteInfoProvider.getPortIdentifier(config.getPortName(), direction);
@@ -141,7 +140,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
                 }
             }
 
-            SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
+            final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
 
             apiClient.setBaseUrl(peer.getUrl());
             apiClient.setConnectTimeoutMillis(timeoutMillis);
@@ -157,7 +156,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
             try {
                 transactionUrl = apiClient.initiateTransaction(direction, portId);
                 commSession.setUserDn(apiClient.getTrustedPeerDn());
-            } catch (Exception e) {
+            } catch (final Exception e) {
+                apiClient.close();
                 logger.debug("Penalizing a peer due to {}", e.getMessage());
                 peerSelector.penalize(peer, penaltyMillis);
 
@@ -170,8 +170,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
             }
 
             // We found a valid peer to communicate with.
-            Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
-            HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
+            final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
+            final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
                     config.isUseCompression(), portId, penaltyMillis, config.getEventReporter());
             transaction.initialize(apiClient, transactionUrl);
 
@@ -183,7 +183,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
 
     }
 
-    private String resolveNodeApiUrl(PeerDescription description) {
+    private String resolveNodeApiUrl(final PeerDescription description) {
         return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api";
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 8a6a91f..6869e4b 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -16,31 +16,9 @@
  */
 package org.apache.nifi.remote.client.socket;
 
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteDestination;
-import org.apache.nifi.remote.RemoteResourceInitiator;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.PeerSelector;
-import org.apache.nifi.remote.client.PeerStatusProvider;
-import org.apache.nifi.remote.client.SiteInfoProvider;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
-import javax.net.ssl.SSLContext;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -63,9 +41,33 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerSelector;
+import org.apache.nifi.remote.client.PeerStatusProvider;
+import org.apache.nifi.remote.client.SiteInfoProvider;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EndpointConnectionPool implements PeerStatusProvider {
 
@@ -84,6 +86,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
 
     private volatile int commsTimeout;
     private volatile boolean shutdown = false;
+    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
 
     private final SiteInfoProvider siteInfoProvider;
     private final PeerSelector peerSelector;
@@ -145,8 +148,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
         return getEndpointConnection(direction, null);
     }
 
-    public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config)
-            throws IOException {
+    public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException {
         //
         // Attempt to get a connection state that already exists for this URL.
         //
@@ -358,15 +360,13 @@ public class EndpointConnectionPool implements PeerStatusProvider {
         }
     }
 
-    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
-        final String hostname = clusterUrl.getHost();
-        final Integer port = siteInfoProvider.getSiteToSitePort();
-        if (port == null) {
-            throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
-        }
+    private Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
+        final String hostname = peerDescription.getHostname();
+        final int port = peerDescription.getPort();
 
         final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
         final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
+
         final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
         final SocketClientProtocol clientProtocol = new SocketClientProtocol();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
@@ -414,6 +414,50 @@ public class EndpointConnectionPool implements PeerStatusProvider {
         return peerStatuses;
     }
 
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
+        final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
+
+        // Look at all of the peers that we fetched last time.
+        final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
+        if (lastFetched != null && !lastFetched.isEmpty()) {
+            lastFetched.stream().map(peer -> peer.getPeerDescription())
+                .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
+        }
+
+        // Always add the configured node info to the list of peers to communicate with
+        final String hostname = clusterUrl.getHost();
+        final Integer port = siteInfoProvider.getSiteToSitePort();
+        if (port == null) {
+            throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
+        }
+
+        final boolean secure = siteInfoProvider.isSecure();
+        peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure));
+
+        Exception lastFailure = null;
+        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
+            try {
+                final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peerDescription);
+                lastFetchedQueryablePeers = statuses.stream()
+                    .filter(p -> p.isQueryForPeers())
+                    .collect(Collectors.toSet());
+
+                return statuses;
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort());
+                lastFailure = e;
+            }
+        }
+
+        final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
+        if (lastFailure != null) {
+            ioe.addSuppressed(lastFailure);
+        }
+
+        throw ioe;
+    }
+
     private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
         final PeerDescription description = peerStatus.getPeerDescription();
         return establishSiteToSiteConnection(description.getHostname(), description.getPort());

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
index ae12c67..8148bf2 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
@@ -32,11 +32,12 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
     private Transaction.TransactionState status = Transaction.TransactionState.TRANSACTION_STARTED;
     private ResponseCode responseCode;
 
-    public HttpServerCommunicationsSession(InputStream inputStream, OutputStream outputStream, String transactionId){
+    public HttpServerCommunicationsSession(final InputStream inputStream, final OutputStream outputStream, final String transactionId, final String userDn) {
         super();
         input.setInputStream(inputStream);
         output.setOutputStream(outputStream);
         this.transactionId = transactionId;
+        setUserDn(userDn);
     }
 
     // This status is only needed by HttpFlowFileServerProtocol, HttpClientTransaction has its own status.
@@ -46,7 +47,7 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
         return status;
     }
 
-    public void setStatus(Transaction.TransactionState status) {
+    public void setStatus(final Transaction.TransactionState status) {
         this.status = status;
     }
 
@@ -58,11 +59,11 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
         return responseCode;
     }
 
-    public void setResponseCode(ResponseCode responseCode) {
+    public void setResponseCode(final ResponseCode responseCode) {
         this.responseCode = responseCode;
     }
 
-    public void putHandshakeParam(HandshakeProperty key, String value) {
+    public void putHandshakeParam(final HandshakeProperty key, final String value) {
         handshakeParams.put(key.name(), value);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 25e78cb..477c52d 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -51,7 +51,8 @@ import java.util.concurrent.TimeUnit;
 
 public class SocketClientProtocol implements ClientProtocol {
 
-    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
+    // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
+    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
 
     private RemoteDestination destination;
     private boolean useCompression = false;
@@ -217,6 +218,8 @@ public class SocketClientProtocol implements ClientProtocol {
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
 
+        final boolean queryPeersForOtherPeers = getVersionNegotiator().getVersion() >= 6;
+
         RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
         dos.flush();
         final int numPeers = dis.readInt();
@@ -226,7 +229,7 @@ public class SocketClientProtocol implements ClientProtocol {
             final int port = dis.readInt();
             final boolean secure = dis.readBoolean();
             final int flowFileCount = dis.readInt();
-            peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
+            peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount, queryPeersForOtherPeers));
         }
 
         logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 8910598..cb3a55a 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -125,11 +125,8 @@ public class SiteToSiteRestApiClient implements Closeable {
     private static final int RESPONSE_CODE_OK = 200;
     private static final int RESPONSE_CODE_CREATED = 201;
     private static final int RESPONSE_CODE_ACCEPTED = 202;
-    private static final int RESPONSE_CODE_SEE_OTHER = 303;
     private static final int RESPONSE_CODE_BAD_REQUEST = 400;
-    private static final int RESPONSE_CODE_UNAUTHORIZED = 401;
     private static final int RESPONSE_CODE_NOT_FOUND = 404;
-    private static final int RESPONSE_CODE_SERVICE_UNAVAILABLE = 503;
 
     private static final Logger logger = LoggerFactory.getLogger(SiteToSiteRestApiClient.class);
 
@@ -161,6 +158,7 @@ public class SiteToSiteRestApiClient implements Closeable {
     public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
         this.sslContext = sslContext;
         this.proxy = proxy;
+
         ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
             private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
 
@@ -168,6 +166,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             public Thread newThread(final Runnable r) {
                 final Thread thread = defaultFactory.newThread(r);
                 thread.setName(Thread.currentThread().getName() + " TTLExtend");
+                thread.setDaemon(true);
                 return thread;
             }
         });
@@ -210,9 +209,9 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     private void setupRequestConfig() {
         final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
-                .setConnectionRequestTimeout(connectTimeoutMillis)
-                .setConnectTimeout(connectTimeoutMillis)
-                .setSocketTimeout(readTimeoutMillis);
+            .setConnectionRequestTimeout(connectTimeoutMillis)
+            .setConnectTimeout(connectTimeoutMillis)
+            .setSocketTimeout(readTimeoutMillis);
 
         if (proxy != null) {
             requestConfigBuilder.setProxy(proxy.getHttpHost());
@@ -226,8 +225,8 @@ public class SiteToSiteRestApiClient implements Closeable {
         if (proxy != null) {
             if (!isEmpty(proxy.getUsername()) && !isEmpty(proxy.getPassword())) {
                 credentialsProvider.setCredentials(
-                        new AuthScope(proxy.getHttpHost()),
-                        new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
+                    new AuthScope(proxy.getHttpHost()),
+                    new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
             }
 
         }
@@ -242,7 +241,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
 
         httpClient = clientBuilder
-                .setDefaultCredentialsProvider(getCredentialsProvider()).build();
+            .setDefaultCredentialsProvider(getCredentialsProvider()).build();
     }
 
     private void setupAsyncClient() {
@@ -268,9 +267,9 @@ public class SiteToSiteRestApiClient implements Closeable {
 
             final SSLSession sslSession;
             if (conn instanceof ManagedHttpClientConnection) {
-                sslSession = ((ManagedHttpClientConnection)conn).getSSLSession();
+                sslSession = ((ManagedHttpClientConnection) conn).getSSLSession();
             } else if (conn instanceof ManagedNHttpClientConnection) {
-                sslSession = ((ManagedNHttpClientConnection)conn).getSSLSession();
+                sslSession = ((ManagedNHttpClientConnection) conn).getSSLSession();
             } else {
                 throw new RuntimeException("Unexpected connection type was used, " + conn);
             }
@@ -285,7 +284,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                 try {
                     final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
                     trustedPeerDn = cert.getSubjectDN().getName().trim();
-                } catch (CertificateException e) {
+                } catch (final CertificateException e) {
                     final String msg = "Could not extract subject DN from SSL session peer certificate";
                     logger.warn(msg);
                     throw new SSLPeerUnverifiedException(msg);
@@ -296,14 +295,14 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     public ControllerDTO getController() throws IOException {
         try {
-            HttpGet get = createGet("/site-to-site");
+            final HttpGet get = createGet("/site-to-site");
             get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
             return execute(get, ControllerEntity.class).getController();
 
-        } catch (HttpGetFailedException e) {
+        } catch (final HttpGetFailedException e) {
             if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) {
                 logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url...");
-                HttpGet get = createGet("/controller");
+                final HttpGet get = createGet("/controller");
                 return execute(get, ControllerEntity.class).getController();
             }
             throw e;
@@ -311,12 +310,12 @@ public class SiteToSiteRestApiClient implements Closeable {
     }
 
     public Collection<PeerDTO> getPeers() throws IOException {
-        HttpGet get = createGet("/site-to-site/peers");
+        final HttpGet get = createGet("/site-to-site/peers");
         get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
         return execute(get, PeersEntity.class).getPeers();
     }
 
-    public String initiateTransaction(TransferDirection direction, String portId) throws IOException {
+    public String initiateTransaction(final TransferDirection direction, final String portId) throws IOException {
         if (TransferDirection.RECEIVE.equals(direction)) {
             return initiateTransaction("output-ports", portId);
         } else {
@@ -324,10 +323,9 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
-    private String initiateTransaction(String portType, String portId) throws IOException {
+    private String initiateTransaction(final String portType, final String portId) throws IOException {
         logger.debug("initiateTransaction handshaking portType={}, portId={}", portType, portId);
-        HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions");
-
+        final HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions");
 
         post.setHeader("Accept", "application/json");
         post.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
@@ -335,27 +333,27 @@ public class SiteToSiteRestApiClient implements Closeable {
         setHandshakeProperties(post);
 
         try (CloseableHttpResponse response = getHttpClient().execute(post)) {
-            int responseCode = response.getStatusLine().getStatusCode();
+            final int responseCode = response.getStatusLine().getStatusCode();
             logger.debug("initiateTransaction responseCode={}", responseCode);
 
             String transactionUrl;
             switch (responseCode) {
-                case RESPONSE_CODE_CREATED :
+                case RESPONSE_CODE_CREATED:
                     EntityUtils.consume(response.getEntity());
 
                     transactionUrl = readTransactionUrl(response);
                     if (isEmpty(transactionUrl)) {
                         throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header");
                     }
-                    Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
+                    final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
                     if (transportProtocolVersionHeader == null) {
                         throw new ProtocolException("Server didn't return confirmed protocol version");
                     }
-                    Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
+                    final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
                     logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
                     transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
 
-                    Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
+                    final Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
                     if (serverTransactionTtlHeader == null) {
                         throw new ProtocolException("Server didn't return " + HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
                     }
@@ -373,33 +371,36 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     }
 
-    public boolean openConnectionForReceive(String transactionUrl, CommunicationsSession commSession) throws IOException {
+    public boolean openConnectionForReceive(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
 
-        HttpGet get = createGet(transactionUrl + "/flow-files");
+        final HttpGet get = createGet(transactionUrl + "/flow-files");
         get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
 
         setHandshakeProperties(get);
 
-        CloseableHttpResponse response = getHttpClient().execute(get);
-        int responseCode = response.getStatusLine().getStatusCode();
+        final CloseableHttpResponse response = getHttpClient().execute(get);
+        final int responseCode = response.getStatusLine().getStatusCode();
         logger.debug("responseCode={}", responseCode);
 
         boolean keepItOpen = false;
         try {
             switch (responseCode) {
-                case RESPONSE_CODE_OK :
+                case RESPONSE_CODE_OK:
                     logger.debug("Server returned RESPONSE_CODE_OK, indicating there was no data.");
                     EntityUtils.consume(response.getEntity());
                     return false;
 
-                case RESPONSE_CODE_ACCEPTED :
-                    InputStream httpIn = response.getEntity().getContent();
-                    InputStream streamCapture = new InputStream() {
+                case RESPONSE_CODE_ACCEPTED:
+                    final InputStream httpIn = response.getEntity().getContent();
+                    final InputStream streamCapture = new InputStream() {
                         boolean closed = false;
+
                         @Override
                         public int read() throws IOException {
-                            if(closed) return -1;
-                            int r = httpIn.read();
+                            if (closed) {
+                                return -1;
+                            }
+                            final int r = httpIn.read();
                             if (r < 0) {
                                 closed = true;
                                 logger.debug("Reached to end of input stream. Closing resources...");
@@ -410,7 +411,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                             return r;
                         }
                     };
-                    ((HttpInput)commSession.getInput()).setInputStream(streamCapture);
+                    ((HttpInput) commSession.getInput()).setInputStream(streamCapture);
 
                     startExtendingTtl(transactionUrl, httpIn, response);
                     keepItOpen = true;
@@ -431,10 +432,11 @@ public class SiteToSiteRestApiClient implements Closeable {
     private final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
     private Future<HttpResponse> postResult;
     private CountDownLatch transferDataLatch = new CountDownLatch(1);
-    public void openConnectionForSend(String transactionUrl, CommunicationsSession commSession) throws IOException {
+
+    public void openConnectionForSend(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
 
         final String flowFilesPath = transactionUrl + "/flow-files";
-        HttpPost post = createPost(flowFilesPath);
+        final HttpPost post = createPost(flowFilesPath);
 
         post.setHeader("Content-Type", "application/octet-stream");
         post.setHeader("Accept", "text/plain");
@@ -442,7 +444,7 @@ public class SiteToSiteRestApiClient implements Closeable {
 
         setHandshakeProperties(post);
 
-        CountDownLatch initConnectionLatch = new CountDownLatch(1);
+        final CountDownLatch initConnectionLatch = new CountDownLatch(1);
 
         final URI requestUri = post.getURI();
         final PipedOutputStream outputStream = new PipedOutputStream();
@@ -463,7 +465,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                 // Pass the output stream so that Site-to-Site client thread can send
                 // data packet through this connection.
                 logger.debug("sending data to {} has started...", flowFilesPath);
-                ((HttpOutput)commSession.getOutput()).setOutputStream(outputStream);
+                ((HttpOutput) commSession.getOutput()).setOutputStream(outputStream);
                 initConnectionLatch.countDown();
 
                 final BasicHttpEntity entity = new BasicHttpEntity();
@@ -474,7 +476,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException {
+            public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
 
                 int totalRead = 0;
                 int totalProduced = 0;
@@ -501,7 +503,7 @@ public class SiteToSiteRestApiClient implements Closeable {
 
                 final long totalWritten = commSession.getOutput().getBytesWritten();
                 logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
-                        flowFilesPath, totalProduced, totalRead, totalWritten);
+                    flowFilesPath, totalProduced, totalRead, totalWritten);
                 if (totalRead != totalWritten || totalProduced != totalWritten) {
                     final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong.";
                     throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
@@ -513,12 +515,12 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public void requestCompleted(HttpContext context) {
+            public void requestCompleted(final HttpContext context) {
                 logger.debug("Sending data to {} completed.", flowFilesPath);
             }
 
             @Override
-            public void failed(Exception ex) {
+            public void failed(final Exception ex) {
                 logger.error("Sending data to {} has failed", flowFilesPath, ex);
             }
 
@@ -554,13 +556,13 @@ public class SiteToSiteRestApiClient implements Closeable {
             transferDataLatch = new CountDownLatch(1);
             startExtendingTtl(transactionUrl, dataPacketChannel, null);
 
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             throw new IOException("Awaiting initConnectionLatch has been interrupted.", e);
         }
 
     }
 
-    public void finishTransferFlowFiles(CommunicationsSession commSession) throws IOException {
+    public void finishTransferFlowFiles(final CommunicationsSession commSession) throws IOException {
 
         if (postResult == null) {
             new IllegalStateException("Data transfer has not started yet.");
@@ -576,7 +578,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             if (!transferDataLatch.await(requestExpirationMillis, TimeUnit.MILLISECONDS)) {
                 throw new IOException("Awaiting transferDataLatch has been timeout.");
             }
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             throw new IOException("Awaiting transferDataLatch has been interrupted.", e);
         }
 
@@ -585,24 +587,24 @@ public class SiteToSiteRestApiClient implements Closeable {
         final HttpResponse response;
         try {
             response = postResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
+        } catch (final ExecutionException e) {
             logger.debug("Something has happened at sending thread. {}", e.getMessage());
-            Throwable cause = e.getCause();
+            final Throwable cause = e.getCause();
             if (cause instanceof IOException) {
                 throw (IOException) cause;
             } else {
                 throw new IOException(cause);
             }
-        } catch (TimeoutException|InterruptedException e) {
+        } catch (TimeoutException | InterruptedException e) {
             throw new IOException(e);
         }
 
-        int responseCode = response.getStatusLine().getStatusCode();
+        final int responseCode = response.getStatusLine().getStatusCode();
         switch (responseCode) {
-            case RESPONSE_CODE_ACCEPTED :
-                String receivedChecksum = EntityUtils.toString(response.getEntity());
-                ((HttpInput)commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes()));
-                ((HttpCommunicationsSession)commSession).setChecksum(receivedChecksum);
+            case RESPONSE_CODE_ACCEPTED:
+                final String receivedChecksum = EntityUtils.toString(response.getEntity());
+                ((HttpInput) commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes()));
+                ((HttpCommunicationsSession) commSession).setChecksum(receivedChecksum);
                 logger.debug("receivedChecksum={}", receivedChecksum);
                 break;
 
@@ -623,17 +625,17 @@ public class SiteToSiteRestApiClient implements Closeable {
         extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
         extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
         extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
-        int extendFrequency = serverTransactionTtl / 2;
+        final int extendFrequency = serverTransactionTtl / 2;
         ttlExtendingThread = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
             try {
                 extendingApiClient.extendTransaction(transactionUrl);
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 logger.warn("Failed to extend transaction ttl", e);
                 try {
                     // Without disconnecting, Site-to-Site client keep reading data packet,
                     // while server has already rollback.
                     this.close();
-                } catch (IOException ec) {
+                } catch (final IOException ec) {
                     logger.warn("Failed to close", e);
                 }
             }
@@ -645,7 +647,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             if (closeable != null) {
                 closeable.close();
             }
-        } catch (IOException e) {
+        } catch (final IOException e) {
             logger.warn("Got an exception during closing {}: {}", closeable, e.getMessage());
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
@@ -653,7 +655,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
-    public TransactionResultEntity extendTransaction(String transactionUrl) throws IOException {
+    public TransactionResultEntity extendTransaction(final String transactionUrl) throws IOException {
         logger.debug("Sending extendTransaction request to transactionUrl: {}", transactionUrl);
 
         final HttpPut put = createPut(transactionUrl);
@@ -663,15 +665,14 @@ public class SiteToSiteRestApiClient implements Closeable {
 
         setHandshakeProperties(put);
 
-        try (CloseableHttpResponse response = getHttpClient().execute(put)) {
-            int responseCode = response.getStatusLine().getStatusCode();
+        try (final CloseableHttpResponse response = getHttpClient().execute(put)) {
+            final int responseCode = response.getStatusLine().getStatusCode();
             logger.debug("extendTransaction responseCode={}", responseCode);
 
-            try (InputStream content = response.getEntity().getContent()) {
+            try (final InputStream content = response.getEntity().getContent()) {
                 switch (responseCode) {
-                    case RESPONSE_CODE_OK :
+                    case RESPONSE_CODE_OK:
                         return readResponse(content);
-
                     default:
                         throw handleErrResponse(responseCode, content);
                 }
@@ -694,39 +695,41 @@ public class SiteToSiteRestApiClient implements Closeable {
     }
 
     private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException {
-        if(in == null) {
+        if (in == null) {
             return new IOException("Unexpected response code: " + responseCode);
         }
-        TransactionResultEntity errEntity = readResponse(in);
-        ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode());
+
+        final TransactionResultEntity errEntity = readResponse(in);
+        final ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode());
+
         switch (errCode) {
             case UNKNOWN_PORT:
                 return new UnknownPortException(errEntity.getMessage());
             case PORT_NOT_IN_VALID_STATE:
                 return new PortNotRunningException(errEntity.getMessage());
             default:
-                return new IOException("Unexpected response code: " + responseCode
-                        + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
+                return new IOException("Unexpected response code: " + responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
         }
     }
 
-    private TransactionResultEntity readResponse(InputStream inputStream) throws IOException {
+    private TransactionResultEntity readResponse(final InputStream inputStream) throws IOException {
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
 
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         StreamUtils.copy(inputStream, bos);
         String responseMessage = null;
+
         try {
             responseMessage = new String(bos.toByteArray(), "UTF-8");
             logger.debug("readResponse responseMessage={}", responseMessage);
 
             final ObjectMapper mapper = new ObjectMapper();
             return mapper.readValue(responseMessage, TransactionResultEntity.class);
-
         } catch (JsonParseException | JsonMappingException e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Failed to parse JSON.", e);
             }
-            TransactionResultEntity entity = new TransactionResultEntity();
+
+            final TransactionResultEntity entity = new TransactionResultEntity();
             entity.setResponseCode(ResponseCode.ABORT.getCode());
             entity.setMessage(responseMessage);
             return entity;
@@ -736,90 +739,109 @@ public class SiteToSiteRestApiClient implements Closeable {
     private String readTransactionUrl(final CloseableHttpResponse response) {
         final Header locationUriIntentHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME);
         logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);
-        if (locationUriIntentHeader != null) {
-            if (LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
-                Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME);
-                logger.debug("transactionUrl={}", transactionUrl);
-                if (transactionUrl != null) {
-                    return transactionUrl.getValue();
-                }
+
+        if (locationUriIntentHeader != null && LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
+            final Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME);
+            logger.debug("transactionUrl={}", transactionUrl);
+
+            if (transactionUrl != null) {
+                return transactionUrl.getValue();
             }
         }
+
         return null;
     }
 
     private void setHandshakeProperties(final HttpRequestBase httpRequest) {
-        if(compress) httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
-        if(requestExpirationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis));
-        if(batchCount > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount));
-        if(batchSize > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize));
-        if(batchDurationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis));
-    }
+        if (compress) {
+            httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
+        }
 
-    private HttpGet createGet(final String path) {
-        final URI url = getUri(path);
-        HttpGet get = new HttpGet(url);
-        get.setConfig(getRequestConfig());
-        return get;
+        if (requestExpirationMillis > 0) {
+            httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis));
+        }
+
+        if (batchCount > 0) {
+            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount));
+        }
+
+        if (batchSize > 0) {
+            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize));
+        }
+
+        if (batchDurationMillis > 0) {
+            httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis));
+        }
     }
 
-    private URI getUri(String path) {
+    private URI getUri(final String path) {
         final URI url;
         try {
-            if(HTTP_ABS_URL.matcher(path).find()){
+            if (HTTP_ABS_URL.matcher(path).find()) {
                 url = new URI(path);
             } else {
-                if(StringUtils.isEmpty(getBaseUrl())){
+                if (StringUtils.isEmpty(getBaseUrl())) {
                     throw new IllegalStateException("API baseUrl is not resolved yet, call setBaseUrl or resolveBaseUrl before sending requests with relative path.");
                 }
                 url = new URI(baseUrl + path);
             }
-        } catch (URISyntaxException e) {
+        } catch (final URISyntaxException e) {
             throw new IllegalArgumentException(e.getMessage());
         }
         return url;
     }
 
+
+    private HttpGet createGet(final String path) {
+        final URI url = getUri(path);
+        final HttpGet get = new HttpGet(url);
+        get.setConfig(getRequestConfig());
+        return get;
+    }
+
     private HttpPost createPost(final String path) {
         final URI url = getUri(path);
-        HttpPost post = new HttpPost(url);
+        final HttpPost post = new HttpPost(url);
         post.setConfig(getRequestConfig());
         return post;
     }
 
     private HttpPut createPut(final String path) {
         final URI url = getUri(path);
-        HttpPut put = new HttpPut(url);
+        final HttpPut put = new HttpPut(url);
         put.setConfig(getRequestConfig());
         return put;
     }
 
     private HttpDelete createDelete(final String path) {
         final URI url = getUri(path);
-        HttpDelete delete = new HttpDelete(url);
+        final HttpDelete delete = new HttpDelete(url);
         delete.setConfig(getRequestConfig());
         return delete;
     }
 
     private String execute(final HttpGet get) throws IOException {
+        final CloseableHttpClient httpClient = getHttpClient();
 
-        CloseableHttpClient httpClient = getHttpClient();
-        try (CloseableHttpResponse response = httpClient.execute(get)) {
-            StatusLine statusLine = response.getStatusLine();
-            int statusCode = statusLine.getStatusCode();
+        try (final CloseableHttpResponse response = httpClient.execute(get)) {
+            final StatusLine statusLine = response.getStatusLine();
+            final int statusCode = statusLine.getStatusCode();
             if (RESPONSE_CODE_OK != statusCode) {
                 throw new HttpGetFailedException(statusCode, statusLine.getReasonPhrase(), null);
             }
-            HttpEntity entity = response.getEntity();
-            String responseMessage = EntityUtils.toString(entity);
+            final HttpEntity entity = response.getEntity();
+            final String responseMessage = EntityUtils.toString(entity);
             return responseMessage;
         }
     }
 
     public class HttpGetFailedException extends IOException {
+        private static final long serialVersionUID = 7920714957269466946L;
+
         private final int responseCode;
         private final String responseMessage;
         private final String explanation;
+
         public HttpGetFailedException(final int responseCode, final String responseMessage, final String explanation) {
             super("response code " + responseCode + ":" + responseMessage + " with explanation: " + explanation);
             this.responseCode = responseCode;
@@ -854,25 +876,25 @@ public class SiteToSiteRestApiClient implements Closeable {
         this.baseUrl = baseUrl;
     }
 
-    public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+    public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
         this.connectTimeoutMillis = connectTimeoutMillis;
     }
 
-    public void setReadTimeoutMillis(int readTimeoutMillis) {
+    public void setReadTimeoutMillis(final int readTimeoutMillis) {
         this.readTimeoutMillis = readTimeoutMillis;
     }
 
-    public String resolveBaseUrl(String clusterUrl) {
+    public String resolveBaseUrl(final String clusterUrl) {
         URI clusterUri;
         try {
             clusterUri = new URI(clusterUrl);
-        } catch (URISyntaxException e) {
+        } catch (final URISyntaxException e) {
             throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e);
         }
         return this.resolveBaseUrl(clusterUri);
     }
 
-    public String resolveBaseUrl(URI clusterUrl) {
+    public String resolveBaseUrl(final URI clusterUrl) {
         String urlPath = clusterUrl.getPath();
         if (urlPath.endsWith("/")) {
             urlPath = urlPath.substring(0, urlPath.length() - 1);
@@ -884,33 +906,41 @@ public class SiteToSiteRestApiClient implements Closeable {
         return resolveBaseUrl(scheme, host, port, "/nifi-api");
     }
 
-    public String resolveBaseUrl(final String scheme, final String host, final int port, String path) {
-        String baseUri = scheme + "://" + host + ":" + port + path;
+    public String resolveBaseUrl(final String scheme, final String host, final int port, final String path) {
+        final String baseUri = scheme + "://" + host + ":" + port + path;
         this.setBaseUrl(baseUri);
         return baseUri;
     }
 
-    public void setCompress(boolean compress) {
+    public void setCompress(final boolean compress) {
         this.compress = compress;
     }
 
-    public void setRequestExpirationMillis(long requestExpirationMillis) {
-        if(requestExpirationMillis < 0) throw new IllegalArgumentException("requestExpirationMillis can't be a negative value.");
+    public void setRequestExpirationMillis(final long requestExpirationMillis) {
+        if (requestExpirationMillis < 0) {
+            throw new IllegalArgumentException("requestExpirationMillis can't be a negative value.");
+        }
         this.requestExpirationMillis = requestExpirationMillis;
     }
 
-    public void setBatchCount(int batchCount) {
-        if(batchCount < 0) throw new IllegalArgumentException("batchCount can't be a negative value.");
+    public void setBatchCount(final int batchCount) {
+        if (batchCount < 0) {
+            throw new IllegalArgumentException("batchCount can't be a negative value.");
+        }
         this.batchCount = batchCount;
     }
 
-    public void setBatchSize(long batchSize) {
-        if(batchSize < 0) throw new IllegalArgumentException("batchSize can't be a negative value.");
+    public void setBatchSize(final long batchSize) {
+        if (batchSize < 0) {
+            throw new IllegalArgumentException("batchSize can't be a negative value.");
+        }
         this.batchSize = batchSize;
     }
 
-    public void setBatchDurationMillis(long batchDurationMillis) {
-        if(batchDurationMillis < 0) throw new IllegalArgumentException("batchDurationMillis can't be a negative value.");
+    public void setBatchDurationMillis(final long batchDurationMillis) {
+        if (batchDurationMillis < 0) {
+            throw new IllegalArgumentException("batchDurationMillis can't be a negative value.");
+        }
         this.batchDurationMillis = batchDurationMillis;
     }
 
@@ -922,33 +952,33 @@ public class SiteToSiteRestApiClient implements Closeable {
         return this.trustedPeerDn;
     }
 
-    public TransactionResultEntity commitReceivingFlowFiles(String transactionUrl, ResponseCode clientResponse, String checksum) throws IOException {
+    public TransactionResultEntity commitReceivingFlowFiles(final String transactionUrl, final ResponseCode clientResponse, final String checksum) throws IOException {
         logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}",
-                transactionUrl, clientResponse, checksum);
+            transactionUrl, clientResponse, checksum);
 
         stopExtendingTtl();
 
-        StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
+        final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
         if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) {
             urlBuilder.append("&checksum=").append(checksum);
         }
 
-        HttpDelete delete = createDelete(urlBuilder.toString());
+        final HttpDelete delete = createDelete(urlBuilder.toString());
         delete.setHeader("Accept", "application/json");
         delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
 
         setHandshakeProperties(delete);
 
         try (CloseableHttpResponse response = getHttpClient().execute(delete)) {
-            int responseCode = response.getStatusLine().getStatusCode();
+            final int responseCode = response.getStatusLine().getStatusCode();
             logger.debug("commitReceivingFlowFiles responseCode={}", responseCode);
 
             try (InputStream content = response.getEntity().getContent()) {
                 switch (responseCode) {
-                    case RESPONSE_CODE_OK :
+                    case RESPONSE_CODE_OK:
                         return readResponse(content);
 
-                    case RESPONSE_CODE_BAD_REQUEST :
+                    case RESPONSE_CODE_BAD_REQUEST:
                         return readResponse(content);
 
                     default:
@@ -959,26 +989,26 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     }
 
-    public TransactionResultEntity commitTransferFlowFiles(String transactionUrl, ResponseCode clientResponse) throws IOException {
-        String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode();
+    public TransactionResultEntity commitTransferFlowFiles(final String transactionUrl, final ResponseCode clientResponse) throws IOException {
+        final String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode();
         logger.debug("Sending commitTransferFlowFiles request to transactionUrl: {}", requestUrl);
 
-        HttpDelete delete = createDelete(requestUrl);
+        final HttpDelete delete = createDelete(requestUrl);
         delete.setHeader("Accept", "application/json");
         delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
 
         setHandshakeProperties(delete);
 
         try (CloseableHttpResponse response = getHttpClient().execute(delete)) {
-            int responseCode = response.getStatusLine().getStatusCode();
+            final int responseCode = response.getStatusLine().getStatusCode();
             logger.debug("commitTransferFlowFiles responseCode={}", responseCode);
 
             try (InputStream content = response.getEntity().getContent()) {
                 switch (responseCode) {
-                    case RESPONSE_CODE_OK :
+                    case RESPONSE_CODE_OK:
                         return readResponse(content);
 
-                    case RESPONSE_CODE_BAD_REQUEST :
+                    case RESPONSE_CODE_BAD_REQUEST:
                         return readResponse(content);
 
                     default:

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index ca820f8..4c0f0d6 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -54,11 +54,11 @@ public class TestPeerSelector {
     @Test
     public void testFormulateDestinationListForOutput() throws IOException {
         final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024, true));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
 
         PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
         PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
@@ -74,8 +74,8 @@ public class TestPeerSelector {
     @Test
     public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
         final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000));
+        collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000, true));
 
         PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
         PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
@@ -90,11 +90,11 @@ public class TestPeerSelector {
     @Test
     public void testFormulateDestinationListForInputPorts() throws IOException {
         final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024, true));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
+        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
 
         PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
         PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
@@ -110,8 +110,8 @@ public class TestPeerSelector {
     @Test
     public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
         final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000));
+        collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500, true));
+        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000, true));
 
         PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
         PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index e31832f..2a9b87c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -39,9 +39,6 @@ public class ConnectionResponse {
     private final int tryLaterSeconds;
     private final NodeIdentifier nodeIdentifier;
     private final DataFlow dataFlow;
-    private final Integer managerRemoteInputPort;
-    private final Integer managerRemoteInputHttpPort;
-    private final Boolean managerRemoteCommsSecure;
     private final String instanceId;
     private final List<NodeConnectionStatus> nodeStatuses;
     private final List<ComponentRevision> componentRevisions;
@@ -49,8 +46,7 @@ public class ConnectionResponse {
     private volatile String coordinatorDN;
 
     public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
-        final Integer managerRemoteInputPort, final Integer managerRemoteInputHttpPort, final Boolean managerRemoteCommsSecure, final String instanceId,
-        final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
+        final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
 
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be empty or null.");
@@ -61,9 +57,6 @@ public class ConnectionResponse {
         this.dataFlow = dataFlow;
         this.tryLaterSeconds = 0;
         this.rejectionReason = null;
-        this.managerRemoteInputPort = managerRemoteInputPort;
-        this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
-        this.managerRemoteCommsSecure = managerRemoteCommsSecure;
         this.instanceId = instanceId;
         this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses));
         this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
@@ -77,9 +70,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = tryLaterSeconds;
         this.rejectionReason = null;
-        this.managerRemoteInputPort = null;
-        this.managerRemoteInputHttpPort = null;
-        this.managerRemoteCommsSecure = null;
         this.instanceId = null;
         this.nodeStatuses = null;
         this.componentRevisions = null;
@@ -90,9 +80,6 @@ public class ConnectionResponse {
         this.nodeIdentifier = null;
         this.tryLaterSeconds = 0;
         this.rejectionReason = rejectionReason;
-        this.managerRemoteInputPort = null;
-        this.managerRemoteInputHttpPort = null;
-        this.managerRemoteCommsSecure = null;
         this.instanceId = null;
         this.nodeStatuses = null;
         this.componentRevisions = null;
@@ -130,18 +117,6 @@ public class ConnectionResponse {
         return nodeIdentifier;
     }
 
-    public Integer getManagerRemoteInputPort() {
-        return managerRemoteInputPort;
-    }
-
-    public Integer getManagerRemoteInputHttpPort() {
-        return managerRemoteInputHttpPort;
-    }
-
-    public Boolean isManagerRemoteCommsSecure() {
-        return managerRemoteCommsSecure;
-    }
-
     public String getInstanceId() {
         return instanceId;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
index 9a53a72..513818b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -33,9 +33,6 @@ public class AdaptedConnectionResponse {
     private NodeIdentifier nodeIdentifier;
     private String rejectionReason;
     private int tryLaterSeconds;
-    private Integer managerRemoteInputPort;
-    private Integer managerRemoteInputHttpPort;
-    private Boolean managerRemoteCommsSecure;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
     private List<ComponentRevision> componentRevisions;
@@ -81,30 +78,6 @@ public class AdaptedConnectionResponse {
         return tryLaterSeconds > 0;
     }
 
-    public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
-        this.managerRemoteInputPort = managerRemoteInputPort;
-    }
-
-    public Integer getManagerRemoteInputPort() {
-        return managerRemoteInputPort;
-    }
-
-    public void setManagerRemoteInputHttpPort(Integer managerRemoteInputHttpPort) {
-        this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
-    }
-
-    public Integer getManagerRemoteInputHttpPort() {
-        return managerRemoteInputHttpPort;
-    }
-
-    public void setManagerRemoteCommsSecure(Boolean secure) {
-        this.managerRemoteCommsSecure = secure;
-    }
-
-    public Boolean isManagerRemoteCommsSecure() {
-        return managerRemoteCommsSecure;
-    }
-
     public void setInstanceId(String instanceId) {
         this.instanceId = instanceId;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index cf64e71..470843e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -31,9 +31,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
             aCr.setNodeIdentifier(cr.getNodeIdentifier());
             aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
             aCr.setRejectionReason(cr.getRejectionReason());
-            aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
-            aCr.setManagerRemoteInputHttpPort(cr.getManagerRemoteInputHttpPort());
-            aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
             aCr.setInstanceId(cr.getInstanceId());
             aCr.setNodeConnectionStatuses(cr.getNodeConnectionStatuses());
             aCr.setComponentRevisions(cr.getComponentRevisions());
@@ -49,7 +46,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
             return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {
             return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(),
-                aCr.getManagerRemoteInputPort(), aCr.getManagerRemoteInputHttpPort(), aCr.isManagerRemoteCommsSecure(),
                 aCr.getInstanceId(), aCr.getNodeConnectionStatuses(), aCr.getComponentRevisions());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
index 9e8fae0..1805924 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeStatusChangeMessage.java
@@ -23,16 +23,12 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.jaxb.message.NodeConnectionStatusAdapter;
 import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Message to indicate that the status of a node in the cluster has changed
  */
 @XmlRootElement(name = "nodeStatusChange")
 public class NodeStatusChangeMessage extends ProtocolMessage {
-    private static final Logger logger = LoggerFactory.getLogger(NodeStatusChangeMessage.class);
-
     private NodeConnectionStatus connectionStatus;
     private NodeIdentifier nodeId;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
index 008b586..e443552 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
@@ -35,9 +35,6 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
     private NodeIdentifier nodeId;
     private StandardDataFlow dataFlow;
     private boolean primary;
-    private Integer managerRemoteSiteListeningPort;
-    private Integer managerRemoteSiteListeningHttpPort;
-    private Boolean managerRemoteSiteCommsSecure;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
     private List<ComponentRevision> componentRevisions;
@@ -75,30 +72,6 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
         return MessageType.RECONNECTION_REQUEST;
     }
 
-    public void setManagerRemoteSiteListeningPort(final Integer listeningPort) {
-        this.managerRemoteSiteListeningPort = listeningPort;
-    }
-
-    public Integer getManagerRemoteSiteListeningPort() {
-        return managerRemoteSiteListeningPort;
-    }
-
-    public void setManagerRemoteSiteListeningHttpPort(Integer managerRemoteSiteListeningHttpPort) {
-        this.managerRemoteSiteListeningHttpPort = managerRemoteSiteListeningHttpPort;
-    }
-
-    public Integer getManagerRemoteSiteListeningHttpPort() {
-        return managerRemoteSiteListeningHttpPort;
-    }
-
-    public void setManagerRemoteSiteCommsSecure(final Boolean remoteSiteCommsSecure) {
-        this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure;
-    }
-
-    public Boolean isManagerRemoteSiteCommsSecure() {
-        return managerRemoteSiteCommsSecure;
-    }
-
     public void setInstanceId(final String instanceId) {
         this.instanceId = instanceId;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index cb51eda..955df17 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -49,7 +49,7 @@ public class TestJaxbProtocolUtils {
         final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
         final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
         final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1")));
-        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 9990, 8080, false, "instance-1", nodeStatuses, componentRevisions));
+        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisions));
 
         JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
         final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));


[2/3] nifi git commit: NIFI-1992: - Updated site-to-site client and server to support clustered nifi instances NIFI-2274: - Ensuring we use the correct URI when updating a connection.

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 1b410d6..d336558 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -766,10 +766,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             return new ConnectionResponse(tryAgainSeconds);
         }
 
-        // TODO: Remove the 'null' values here from the ConnectionResponse all together. These
-        // will no longer be needed for site-to-site once the NCM is gone.
-        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, null, instanceId,
-            new ArrayList<>(nodeStatuses.values()),
+        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()),
             revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
index 6fbb88c..4f86001 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote.protocol;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -24,6 +25,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -131,9 +133,11 @@ public interface ServerProtocol extends VersionedRemoteResource {
      * a cluster, sends info about itself
      *
      * @param peer peer
+     * @param clusterNodeInfo the cluster information
+     *
      * @throws java.io.IOException ioe
      */
-    void sendPeerList(Peer peer) throws IOException;
+    void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException;
 
     void shutdown(Peer peer);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
new file mode 100644
index 0000000..9f8439c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
+
+public class ClusterCoordinatorNodeInformant implements NodeInformant {
+    private final ClusterCoordinator clusterCoordinator;
+
+    public ClusterCoordinatorNodeInformant(final ClusterCoordinator coordinator) {
+        this.clusterCoordinator = coordinator;
+    }
+
+    @Override
+    public ClusterNodeInformation getNodeInformation() {
+        final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
+        final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
+
+        // TODO: Get total number of FlowFiles for each node
+        for (final NodeIdentifier nodeId : nodeIds) {
+            final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
+                nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
+            nodeInfoCollection.add(nodeInfo);
+        }
+
+        final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();
+        nodeInfo.setNodeInformation(nodeInfoCollection);
+        return nodeInfo;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index fd2f588..ac004e1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -198,6 +198,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup;
 import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.StandardRootGroupPort;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
@@ -298,9 +299,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Integer remoteInputSocketPort;
     private final Integer remoteInputHttpPort;
     private final Boolean isSiteToSiteSecure;
-    private Integer clusterManagerRemoteSitePort = null;
-    private Integer clusterManagerRemoteSiteHttpPort = null;
-    private Boolean clusterManagerRemoteSiteCommsSecure = null;
 
     private ProcessGroup rootGroup;
     private final List<Connectable> startConnectablesAfterInitialization;
@@ -411,8 +409,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             clusterCoordinator,
             heartbeatMonitor);
 
-        flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
-
         return flowController;
     }
 
@@ -525,11 +521,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
             RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
-            externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null));
+
+            final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
+            externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant));
         }
 
         if (remoteInputHttpPort == null) {
-            LOG.info("Not enabling HTTP(S) Site-to-Site functionality because nifi.remote.input.html.enabled is not true");
+            LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true");
         } else {
             externalSiteListeners.add(HttpRemoteSiteListener.getInstance());
         }
@@ -3895,45 +3893,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return new ArrayList<>(history.getActions());
     }
 
-    public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Integer managerListeningHttpPort, final Boolean commsSecure) {
-        writeLock.lock();
-        try {
-            clusterManagerRemoteSitePort = managerListeningPort;
-            clusterManagerRemoteSiteHttpPort = managerListeningHttpPort;
-            clusterManagerRemoteSiteCommsSecure = commsSecure;
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public Integer getClusterManagerRemoteSiteListeningPort() {
-        readLock.lock();
-        try {
-            return clusterManagerRemoteSitePort;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-
-    public Integer getClusterManagerRemoteSiteListeningHttpPort() {
-        readLock.lock();
-        try {
-            return clusterManagerRemoteSiteHttpPort;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public Boolean isClusterManagerRemoteSiteCommsSecure() {
-        readLock.lock();
-        try {
-            return clusterManagerRemoteSiteCommsSecure;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
     public Integer getRemoteSiteListeningPort() {
         return remoteInputSocketPort;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 801a4e2..49f32c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -467,7 +467,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                     controller.setClustered(true, null);
                     clusterCoordinator.setConnected(false);
 
-                    controller.setClusterManagerRemoteSiteInfo(null, null, null);
                     controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
 
                     /*
@@ -586,9 +585,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             // reconnect
             final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
-                    request.getManagerRemoteSiteListeningPort(), request.getManagerRemoteSiteListeningHttpPort(),
-                    request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(),
-                    request.getNodeConnectionStatuses(), request.getComponentRevisions());
+                request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
 
             connectionResponse.setCoordinatorDN(request.getRequestorDN());
             loadFromConnectionResponse(connectionResponse);
@@ -853,7 +850,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             // mark the node as clustered
             controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN());
-            controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure());
 
             final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
             final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index fb9da32..a5f66ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -1138,9 +1138,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
         @Override
         public void run() {
-            try (
-                final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient();
-            ){
+            try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()){
                 try {
                     final ControllerDTO dto = apiClient.getController();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index 7de36c8..fb0ce7c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -44,6 +44,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
     private AuditService auditService;
     private StringEncryptor encryptor;
     private BulletinRepository bulletinRepository;
+    private ClusterCoordinator clusterCoordinator;
 
     @Override
     public Object getObject() throws Exception {
@@ -53,7 +54,6 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
             if (properties.isNode()) {
                 final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
                 final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
-                final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
                 flowController = FlowController.createClusteredInstance(
                     flowFileEventRepository,
                     properties,
@@ -114,4 +114,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
         this.bulletinRepository = bulletinRepository;
     }
 
+    public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
+        this.clusterCoordinator = clusterCoordinator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index 1004baf..3cd5159 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -40,6 +40,7 @@
         <property name="auditService" ref="auditService" />
         <property name="encryptor" ref="stringEncryptor" />
         <property name="bulletinRepository" ref="bulletinRepository" />
+        <property name="clusterCoordinator" ref="clusterCoordinator" />
     </bean>
 
     <!-- flow service -->

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
index acf7fc5..08fb188 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -47,7 +47,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
 
     private final Map<String, TransactionWrapper> transactions = new ConcurrentHashMap<>();
     private final ScheduledExecutorService taskExecutor;
-    private final int httpListenPort;
     private ProcessGroup rootGroup;
     private ScheduledFuture<?> transactionMaintenanceTask;
 
@@ -76,9 +75,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
                     SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec);
         }
         transactionTtlSec = txTtlSec;
-
-        httpListenPort = properties.getRemoteInputHttpPort() != null ? properties.getRemoteInputHttpPort() : 0;
-
     }
 
     public static HttpRemoteSiteListener getInstance() {
@@ -130,9 +126,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
             try {
                 Set<String> transactionIds = transactions.keySet().stream().collect(Collectors.toSet());
                 transactionIds.stream().filter(tid -> !isTransactionActive(tid))
-                    .forEach(tid -> {
-                        cancelTransaction(tid);
-                    });
+                    .forEach(tid -> cancelTransaction(tid));
             } catch (Exception e) {
                 // Swallow exception so that this thread can keep working.
                 logger.error("An exception occurred while maintaining transactions", e);
@@ -161,10 +155,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
         }
     }
 
-    @Override
-    public int getPort() {
-        return httpListenPort;
-    }
 
     @Override
     public void stop() {
@@ -225,7 +215,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
         return transaction.transaction;
     }
 
-    public void extendsTransaction(final String transactionId) throws IllegalStateException {
+    public void extendTransaction(final String transactionId) throws IllegalStateException {
         if (!isTransactionActive(transactionId)){
             throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
index 6f7b977..1183fc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
@@ -26,8 +26,5 @@ public interface RemoteSiteListener {
 
     void start() throws IOException;
 
-    int getPort();
-
     void stop();
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index a5d4bbe..814d0e6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -266,7 +267,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                                                 protocol.getPort().receiveFlowFiles(peer, protocol);
                                                 break;
                                             case REQUEST_PEER_LIST:
-                                                protocol.sendPeerList(peer);
+                                                protocol.sendPeerList(peer, nodeInformant == null ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation()));
                                                 break;
                                             case SHUTDOWN:
                                                 protocol.shutdown(peer);
@@ -321,8 +322,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
         listenerThread.start();
     }
 
-    @Override
-    public int getPort() {
+    private int getPort() {
         return socketPort;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
deleted file mode 100644
index f187625..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.http;
-
-import org.apache.nifi.remote.HttpRemoteSiteListener;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.codec.StandardFlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
-import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.FlowFileTransaction;
-import org.apache.nifi.remote.protocol.HandshakenProperties;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.Response;
-import org.apache.nifi.remote.protocol.ResponseCode;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
-
-    public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
-
-    private final FlowFileCodec codec = new StandardFlowFileCodec();
-    private final VersionNegotiator versionNegotiator;
-    private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
-
-    public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) {
-        super();
-        this.versionNegotiator = versionNegotiator;
-    }
-
-    @Override
-    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException {
-        return codec;
-    }
-
-    @Override
-    public FlowFileCodec getPreNegotiatedCodec() {
-        return codec;
-    }
-
-    @Override
-    protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
-        HandshakenProperties confirmed = new HandshakenProperties();
-
-        HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        confirmed.setCommsIdentifier(commsSession.getTransactionId());
-        validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
-
-        logger.debug("{} Done handshake, confirmed={}", this, confirmed);
-        return confirmed;
-    }
-
-    @Override
-    protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException {
-        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
-
-        commSession.setResponseCode(response);
-        if(isTransfer){
-            switch (response) {
-                case NO_MORE_DATA:
-                    logger.debug("{} There's no data to send.", this);
-                    break;
-                case CONTINUE_TRANSACTION:
-                    logger.debug("{} Continue transaction... expecting more flow files.", this);
-                    commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
-                    break;
-                case BAD_CHECKSUM:
-                    logger.debug("{} Received BAD_CHECKSUM.", this);
-                    commSession.setStatus(Transaction.TransactionState.ERROR);
-                    break;
-                case CONFIRM_TRANSACTION:
-                    logger.debug("{} Transaction is confirmed.", this);
-                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
-                    break;
-                case FINISH_TRANSACTION:
-                    logger.debug("{} transaction is completed.", this);
-                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
-                    break;
-            }
-        } else {
-            switch (response) {
-                case CONFIRM_TRANSACTION:
-                    logger.debug("{} Confirming transaction. checksum={}", this, explanation);
-                    commSession.setChecksum(explanation);
-                    commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
-                    break;
-                case TRANSACTION_FINISHED:
-                case TRANSACTION_FINISHED_BUT_DESTINATION_FULL:
-                    logger.debug("{} Transaction is completed. responseCode={}", this, response);
-                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
-                    break;
-            }
-        }
-    }
-
-    @Override
-    protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException {
-        // Returns Response based on current status.
-        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
-
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Transaction.TransactionState currentStatus = commSession.getStatus();
-        if(isTransfer){
-            switch (currentStatus){
-                case DATA_EXCHANGED:
-                    String clientChecksum = commSession.getChecksum();
-                    logger.debug("readTransactionResponse. clientChecksum={}", clientChecksum);
-                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(new DataOutputStream(bos), clientChecksum);
-                    break;
-                case TRANSACTION_CONFIRMED:
-                    logger.debug("readTransactionResponse. finishing.");
-                    ResponseCode.TRANSACTION_FINISHED.writeResponse(new DataOutputStream(bos));
-                    break;
-            }
-        } else {
-            switch (currentStatus){
-                case TRANSACTION_STARTED:
-                    logger.debug("readTransactionResponse. returning CONTINUE_TRANSACTION.");
-                    // We don't know if there's more data to receive, so just continue it.
-                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(new DataOutputStream(bos));
-                    break;
-                case TRANSACTION_CONFIRMED:
-                    // Checksum was successfully validated at client side, or BAD_CHECKSUM is returned.
-                    ResponseCode responseCode = commSession.getResponseCode();
-                    logger.debug("readTransactionResponse. responseCode={}", responseCode);
-                    if(responseCode.containsMessage()){
-                        responseCode.writeResponse(new DataOutputStream(bos), "");
-                    } else {
-                        responseCode.writeResponse(new DataOutputStream(bos));
-                    }
-                    break;
-            }
-        }
-
-        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-        return Response.read(new DataInputStream(bis));
-    }
-
-    private int holdTransaction(Peer peer, FlowFileTransaction transaction) {
-        // We don't commit the session here yet,
-        // to avoid losing sent flow files in case some issue happens at client side while it is processing,
-        // hold the transaction until we confirm additional request from client.
-        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        String transactionId = commSession.getTransactionId();
-        logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
-        transactionManager.holdTransaction(transactionId, transaction);
-
-        return transaction.getFlowFilesSent().size();
-    }
-
-    @Override
-    protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
-        return holdTransaction(peer, transaction);
-    }
-
-    public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException {
-        logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum);
-        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        String transactionId = commSession.getTransactionId();
-        FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
-        commSession.setChecksum(clientChecksum);
-        commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
-        return super.commitTransferTransaction(peer, transaction);
-    }
-
-    @Override
-    protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
-        return holdTransaction(peer, transaction);
-    }
-
-    public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException {
-        logger.debug("{} Committing the receive transaction. peer={}", this, peer);
-        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        String transactionId = commSession.getTransactionId();
-        FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
-        commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
-        return super.commitReceiveTransaction(peer, transaction);
-    }
-
-    @Override
-    public RequestType getRequestType(final Peer peer) throws IOException {
-        return null;
-    }
-
-    @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-
-    @Override
-    public void sendPeerList(final Peer peer) throws IOException {
-    }
-
-    @Override
-    public String getResourceName() {
-        return RESOURCE_NAME;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
new file mode 100644
index 0000000..c4f1f5c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
+
+    public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
+
+    private final FlowFileCodec codec = new StandardFlowFileCodec();
+    private final VersionNegotiator versionNegotiator;
+    private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
+
+    public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
+        super();
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    @Override
+    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException {
+        return codec;
+    }
+
+    @Override
+    public FlowFileCodec getPreNegotiatedCodec() {
+        return codec;
+    }
+
+    @Override
+    protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
+        HandshakenProperties confirmed = new HandshakenProperties();
+
+        HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        confirmed.setCommsIdentifier(commsSession.getTransactionId());
+        validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
+
+        logger.debug("{} Done handshake, confirmed={}", this, confirmed);
+        return confirmed;
+    }
+
+    @Override
+    protected void writeTransactionResponse(boolean isTransfer, ResponseCode response, CommunicationsSession commsSession, String explanation) throws IOException {
+        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
+
+        commSession.setResponseCode(response);
+        if(isTransfer){
+            switch (response) {
+                case NO_MORE_DATA:
+                    logger.debug("{} There's no data to send.", this);
+                    break;
+                case CONTINUE_TRANSACTION:
+                    logger.debug("{} Continue transaction... expecting more flow files.", this);
+                    commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+                    break;
+                case BAD_CHECKSUM:
+                    logger.debug("{} Received BAD_CHECKSUM.", this);
+                    commSession.setStatus(Transaction.TransactionState.ERROR);
+                    break;
+                case CONFIRM_TRANSACTION:
+                    logger.debug("{} Transaction is confirmed.", this);
+                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+                    break;
+                case FINISH_TRANSACTION:
+                    logger.debug("{} transaction is completed.", this);
+                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+                    break;
+            }
+        } else {
+            switch (response) {
+                case CONFIRM_TRANSACTION:
+                    logger.debug("{} Confirming transaction. checksum={}", this, explanation);
+                    commSession.setChecksum(explanation);
+                    commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+                    break;
+                case TRANSACTION_FINISHED:
+                case TRANSACTION_FINISHED_BUT_DESTINATION_FULL:
+                    logger.debug("{} Transaction is completed. responseCode={}", this, response);
+                    commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+                    break;
+            }
+        }
+    }
+
+    @Override
+    protected Response readTransactionResponse(boolean isTransfer, CommunicationsSession commsSession) throws IOException {
+        // Returns Response based on current status.
+        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) commsSession;
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        Transaction.TransactionState currentStatus = commSession.getStatus();
+        if(isTransfer){
+            switch (currentStatus){
+                case DATA_EXCHANGED:
+                    String clientChecksum = commSession.getChecksum();
+                    logger.debug("readTransactionResponse. clientChecksum={}", clientChecksum);
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(new DataOutputStream(bos), clientChecksum);
+                    break;
+                case TRANSACTION_CONFIRMED:
+                    logger.debug("readTransactionResponse. finishing.");
+                    ResponseCode.TRANSACTION_FINISHED.writeResponse(new DataOutputStream(bos));
+                    break;
+            }
+        } else {
+            switch (currentStatus){
+                case TRANSACTION_STARTED:
+                    logger.debug("readTransactionResponse. returning CONTINUE_TRANSACTION.");
+                    // We don't know if there's more data to receive, so just continue it.
+                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(new DataOutputStream(bos));
+                    break;
+                case TRANSACTION_CONFIRMED:
+                    // Checksum was successfully validated at client side, or BAD_CHECKSUM is returned.
+                    ResponseCode responseCode = commSession.getResponseCode();
+                    logger.debug("readTransactionResponse. responseCode={}", responseCode);
+                    if(responseCode.containsMessage()){
+                        responseCode.writeResponse(new DataOutputStream(bos), "");
+                    } else {
+                        responseCode.writeResponse(new DataOutputStream(bos));
+                    }
+                    break;
+            }
+        }
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+        return Response.read(new DataInputStream(bis));
+    }
+
+    private int holdTransaction(Peer peer, FlowFileTransaction transaction) {
+        // We don't commit the session here yet,
+        // to avoid losing sent flow files in case some issue happens at client side while it is processing,
+        // hold the transaction until we confirm additional request from client.
+        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
+        transactionManager.holdTransaction(transactionId, transaction);
+
+        return transaction.getFlowFilesSent().size();
+    }
+
+    @Override
+    protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
+        return holdTransaction(peer, transaction);
+    }
+
+    @Override
+    public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException {
+        logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum);
+        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
+        commSession.setChecksum(clientChecksum);
+        commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+        return super.commitTransferTransaction(peer, transaction);
+    }
+
+    @Override
+    protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
+        return holdTransaction(peer, transaction);
+    }
+
+    @Override
+    public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException {
+        logger.debug("{} Committing the receive transaction. peer={}", this, peer);
+        HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        FlowFileTransaction transaction = transactionManager.finalizeTransaction(transactionId);
+        commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+        return super.commitReceiveTransaction(peer, transaction);
+    }
+
+    @Override
+    public RequestType getRequestType(final Peer peer) throws IOException {
+        return null;
+    }
+
+    @Override
+    public VersionNegotiator getVersionNegotiator() {
+        return versionNegotiator;
+    }
+
+    @Override
+    public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInformation) throws IOException {
+    }
+
+    @Override
+    public String getResourceName() {
+        return RESOURCE_NAME;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
deleted file mode 100644
index af6860b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.HandshakeProperty;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.ResponseCode;
-import org.apache.nifi.remote.protocol.ServerProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClusterManagerServerProtocol implements ServerProtocol {
-
-    public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
-
-    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
-    private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class);
-    private NodeInformant nodeInformant;
-
-    private String commsIdentifier;
-    private boolean shutdown = false;
-    private boolean handshakeCompleted = false;
-    private long requestExpirationMillis = 30000L;
-
-    public ClusterManagerServerProtocol() {
-    }
-
-    @Override
-    public void setNodeInformant(final NodeInformant nodeInformant) {
-        this.nodeInformant = nodeInformant;
-    }
-
-    @Override
-    public void handshake(final Peer peer) throws IOException, HandshakeException {
-        if (handshakeCompleted) {
-            throw new IllegalStateException("Handshake has already been completed");
-        }
-        if (shutdown) {
-            throw new IllegalStateException("Protocol is shutdown");
-        }
-
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
-        // read communications identifier
-        commsIdentifier = dis.readUTF();
-
-        // read all of the properties. we don't really care what the properties are.
-        final int numProperties = dis.readInt();
-        for (int i = 0; i < numProperties; i++) {
-            final String propertyName = dis.readUTF();
-            final String propertyValue = dis.readUTF();
-
-            final HandshakeProperty property;
-            try {
-                property = HandshakeProperty.valueOf(propertyName);
-                if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) {
-                    requestExpirationMillis = Long.parseLong(propertyValue);
-                }
-            } catch (final Exception e) {
-            }
-        }
-
-        // send "OK" response
-        ResponseCode.PROPERTIES_OK.writeResponse(dos);
-
-        logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier);
-        handshakeCompleted = true;
-    }
-
-    @Override
-    public boolean isHandshakeSuccessful() {
-        return handshakeCompleted;
-    }
-
-    @Override
-    public void sendPeerList(final Peer peer) throws IOException {
-        if (!handshakeCompleted) {
-            throw new IllegalStateException("Handshake has not been completed");
-        }
-        if (shutdown) {
-            throw new IllegalStateException("Protocol is shutdown");
-        }
-
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-
-        final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation();
-        final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
-
-        // determine how many nodes have Site-to-site enabled
-        int numPeers = 0;
-        for (final NodeInformation nodeInfo : nodeInfos) {
-            if (nodeInfo.getSiteToSitePort() != null) {
-                numPeers++;
-            }
-        }
-
-        dos.writeInt(numPeers);
-        for (final NodeInformation nodeInfo : nodeInfos) {
-            if (nodeInfo.getSiteToSitePort() == null) {
-                continue;
-            }
-
-            dos.writeUTF(nodeInfo.getSiteToSiteHostname());
-            dos.writeInt(nodeInfo.getSiteToSitePort());
-            dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
-            dos.writeInt(nodeInfo.getTotalFlowFiles());
-        }
-
-        logger.info("Redirected {} to {} nodes", peer, numPeers);
-
-        dos.flush();
-    }
-
-    @Override
-    public void shutdown(final Peer peer) {
-        shutdown = true;
-    }
-
-    @Override
-    public boolean isShutdown() {
-        return shutdown;
-    }
-
-    @Override
-    public FlowFileCodec negotiateCodec(Peer peer) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public FlowFileCodec getPreNegotiatedCodec() {
-        return null;
-    }
-
-    @Override
-    public RequestType getRequestType(final Peer peer) throws IOException {
-        final CommunicationsSession commsSession = peer.getCommunicationsSession();
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        return RequestType.readRequestType(dis);
-    }
-
-    @Override
-    public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-
-    @Override
-    public String getResourceName() {
-        return RESOURCE_NAME;
-    }
-
-    @Override
-    public void setRootProcessGroup(final ProcessGroup rootGroup) {
-    }
-
-    @Override
-    public RootGroupPort getPort() {
-        return null;
-    }
-
-    @Override
-    public long getRequestExpiration() {
-        return requestExpirationMillis;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index a2a7223..fe7d163 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -20,6 +20,8 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.RemoteResourceFactory;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
@@ -34,14 +36,19 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol {
 
     public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
 
-    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
+    // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
+    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
 
     @Override
     protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
@@ -147,7 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
     }
 
     @Override
-    public void sendPeerList(final Peer peer) throws IOException {
+    public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException {
         if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
@@ -167,12 +174,36 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
         }
         logger.debug("{} Advertising Remote Input host name {}", this, peer);
 
-        // we have only 1 peer: ourselves.
-        dos.writeInt(1);
-        dos.writeUTF(remoteInputHost);
-        dos.writeInt(properties.getRemoteInputPort());
-        dos.writeBoolean(properties.isSiteToSiteSecure());
-        dos.writeInt(0);    // doesn't matter how many FlowFiles we have, because we're the only host.
+        List<NodeInformation> nodeInfos;
+        if (clusterNodeInfo.isPresent()) {
+            nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation());
+        } else {
+            final NodeInformation self = new NodeInformation(remoteInputHost, properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.getRemoteInputHttpPort(),
+                properties.isSiteToSiteSecure(), 0);
+            nodeInfos = Collections.singletonList(self);
+        }
+
+        // determine how many nodes have Site-to-site enabled
+        int numPeers = 0;
+        for (final NodeInformation nodeInfo : nodeInfos) {
+            if (nodeInfo.getSiteToSitePort() != null) {
+                numPeers++;
+            }
+        }
+
+        dos.writeInt(numPeers);
+        for (final NodeInformation nodeInfo : nodeInfos) {
+            if (nodeInfo.getSiteToSitePort() == null) {
+                continue;
+            }
+
+            dos.writeUTF(nodeInfo.getSiteToSiteHostname());
+            dos.writeInt(nodeInfo.getSiteToSitePort());
+            dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
+            dos.writeInt(nodeInfo.getTotalFlowFiles());
+        }
+
+        logger.info("Sending list of {} peers back to client {}", numPeers, peer);
         dos.flush();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
index fe2182f..67a7a9c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol
@@ -12,5 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
-org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol
\ No newline at end of file
+
+org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index a8900c9..4519ddd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -80,7 +80,7 @@ public class TestHttpFlowFileServerProtocol {
         final PeerDescription description = new PeerDescription("peer-host", 8080, false);
         final InputStream inputStream = new ByteArrayInputStream(new byte[]{});
         final OutputStream outputStream = new ByteArrayOutputStream();
-        final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+        final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, "user");
         commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false");
         commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, "1234");
         final String peerUrl = "http://peer-host:8080/";
@@ -90,7 +90,7 @@ public class TestHttpFlowFileServerProtocol {
 
     private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() {
         final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
-        return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+        return new StandardHttpFlowFileServerProtocol(versionNegotiator);
     }
 
     @Test
@@ -101,7 +101,7 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.handshake(peer);
             fail();
-        } catch (HandshakeException e) {
+        } catch (final HandshakeException e) {
             assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode());
         }
 
@@ -122,7 +122,7 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.handshake(peer);
             fail();
-        } catch (HandshakeException e) {
+        } catch (final HandshakeException e) {
             assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode());
         }
 
@@ -147,7 +147,7 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.handshake(peer);
             fail();
-        } catch (HandshakeException e) {
+        } catch (final HandshakeException e) {
             assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode());
         }
 
@@ -173,7 +173,7 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.handshake(peer);
             fail();
-        } catch (HandshakeException e) {
+        } catch (final HandshakeException e) {
             assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode());
         }
 
@@ -196,7 +196,7 @@ public class TestHttpFlowFileServerProtocol {
         doReturn(true).when(authResult).isAuthorized();
         doReturn(true).when(port).isValid();
         doReturn(true).when(port).isRunning();
-        Set<Connection> connections = new HashSet<>();
+        final Set<Connection> connections = new HashSet<>();
         final Connection connection = mock(Connection.class);
         connections.add(connection);
         doReturn(connections).when(port).getConnections();
@@ -208,7 +208,7 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.handshake(peer);
             fail();
-        } catch (HandshakeException e) {
+        } catch (final HandshakeException e) {
             assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode());
         }
 
@@ -237,13 +237,13 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
             fail("transferFlowFiles should fail since it's already shutdown.");
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
         }
 
         try {
             serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
             fail("receiveFlowFiles should fail since it's already shutdown.");
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
         }
     }
 
@@ -288,12 +288,12 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum");
             fail();
-        } catch (IOException e) {
+        } catch (final IOException e) {
             assertTrue(e.getMessage().contains("CRC32 Checksum"));
         }
     }
 
-    private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId) throws IOException {
+    private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException {
         final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
         final Peer peer = getDefaultPeer(transactionId);
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
@@ -312,21 +312,21 @@ public class TestHttpFlowFileServerProtocol {
         doReturn(flowFile).when(processSession).get();
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            String peerUrl = (String)invocation.getArguments()[1];
-            String detail = (String)invocation.getArguments()[2];
+            final String peerUrl = (String)invocation.getArguments()[1];
+            final String detail = (String)invocation.getArguments()[2];
             assertEquals("http://peer-host:8080/", peerUrl);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
 
         doAnswer(invocation -> {
-            InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
+            final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
             callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
             return null;
         }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
 
         // Execute test using mock
-        int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
+        final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
         assertEquals(1, flowFileSent);
 
         assertTrue(remoteSiteListener.isTransactionActive(transactionId));
@@ -360,8 +360,8 @@ public class TestHttpFlowFileServerProtocol {
 
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            String peerUrl = (String)invocation.getArguments()[1];
-            String detail = (String)invocation.getArguments()[2];
+            final String peerUrl = (String)invocation.getArguments()[1];
+            final String detail = (String)invocation.getArguments()[2];
             assertEquals("http://peer-host:8080/", peerUrl);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
@@ -369,15 +369,15 @@ public class TestHttpFlowFileServerProtocol {
 
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            String peerUrl = (String)invocation.getArguments()[1];
-            String detail = (String)invocation.getArguments()[2];
+            final String peerUrl = (String)invocation.getArguments()[1];
+            final String detail = (String)invocation.getArguments()[2];
             assertEquals("http://peer-host:8080/", peerUrl);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
 
         doAnswer(invocation -> {
-            InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
+            final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
             callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
             return null;
         }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
@@ -397,7 +397,7 @@ public class TestHttpFlowFileServerProtocol {
         final String contents = "Content from client.";
         final byte[] bytes = contents.getBytes();
         final InputStream in = new ByteArrayInputStream(bytes);
-        Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("client-attr-1", "client-attr-1-value");
         attributes.put("client-attr-2", "client-attr-2-value");
         return new StandardDataPacket(attributes, in, bytes.length);
@@ -458,12 +458,12 @@ public class TestHttpFlowFileServerProtocol {
         try {
             serverProtocol.commitReceiveTransaction(peer);
             fail();
-        } catch (IOException e) {
+        } catch (final IOException e) {
             assertTrue(e.getMessage().contains("Received a BadChecksum response"));
         }
     }
 
-    private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId, Peer peer) throws IOException {
+    private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException {
         final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
         commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
@@ -479,7 +479,7 @@ public class TestHttpFlowFileServerProtocol {
         final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
         final FlowFile flowFile = mock(FlowFile.class);
 
-        DataPacket dataPacket = createClientDataPacket();
+        final DataPacket dataPacket = createClientDataPacket();
 
         final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
         negotiatedCoded.encode(dataPacket, testDataOs);
@@ -488,7 +488,7 @@ public class TestHttpFlowFileServerProtocol {
         ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
 
         doAnswer(invocation -> {
-            InputStream is = (InputStream) invocation.getArguments()[0];
+            final InputStream is = (InputStream) invocation.getArguments()[0];
             for (int b; (b = is.read()) >= 0;) {
                 // consume stream.
             }
@@ -499,21 +499,21 @@ public class TestHttpFlowFileServerProtocol {
         doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            String peerUrl = (String)invocation.getArguments()[1];
-            String detail = (String)invocation.getArguments()[3];
+            final String peerUrl = (String)invocation.getArguments()[1];
+            final String detail = (String)invocation.getArguments()[3];
             assertEquals("http://peer-host:8080/", peerUrl);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter)
                 .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
 
-        Set<Relationship> relations = new HashSet<>();
+        final Set<Relationship> relations = new HashSet<>();
         final Relationship relationship = new Relationship.Builder().build();
         relations.add(relationship);
         doReturn(relations).when(context).getAvailableRelationships();
 
         // Execute test using mock
-        int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
+        final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
         assertEquals(1, flowFileReceived);
 
         assertTrue(remoteSiteListener.isTransactionActive(transactionId));
@@ -549,7 +549,7 @@ public class TestHttpFlowFileServerProtocol {
         ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
 
         doAnswer(invocation -> {
-            InputStream is = (InputStream) invocation.getArguments()[0];
+            final InputStream is = (InputStream) invocation.getArguments()[0];
             for (int b; (b = is.read()) >= 0;) {
                 // consume stream.
             }
@@ -562,15 +562,15 @@ public class TestHttpFlowFileServerProtocol {
                 .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            String peerUrl = (String)invocation.getArguments()[1];
-            String detail = (String)invocation.getArguments()[3];
+            final String peerUrl = (String)invocation.getArguments()[1];
+            final String detail = (String)invocation.getArguments()[3];
             assertEquals("http://peer-host:8080/", peerUrl);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter)
                 .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
 
-        Set<Relationship> relations = new HashSet<>();
+        final Set<Relationship> relations = new HashSet<>();
         doReturn(relations).when(context).getAvailableRelationships();
 
         // Execute test using mock

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 31087c9..892718e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,28 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
 import org.apache.nifi.action.FlowChangeAction;
@@ -194,26 +215,7 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -2157,15 +2159,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         controllerDTO.setDisabledCount(counts.getDisabledCount());
 
         // determine the site to site configuration
-        if (isClustered()) {
-            controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
-            controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort());
-            controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
-        } else {
-            controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
-            controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
-            controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
-        }
+        controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
+        controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
+        controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
 
         return controllerDTO;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c81dc195/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index aad8b4a..e77d769 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -49,7 +49,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
 import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
-import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
+import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.slf4j.Logger;
@@ -305,16 +305,18 @@ public class DataTransferResource extends ApplicationResource {
     }
 
     HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
-        return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+        return new StandardHttpFlowFileServerProtocol(versionNegotiator);
     }
 
     private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
-        String clientHostName = req.getRemoteHost();
-        int clientPort = req.getRemotePort();
+        final String clientHostName = req.getRemoteHost();
+        final int clientPort = req.getRemotePort();
 
-        PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
+        final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
 
-        HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final String userDn = user == null ? null : user.getIdentity();
+        final HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, userDn);
 
         boolean useCompression = false;
         final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
@@ -330,20 +332,28 @@ public class DataTransferResource extends ApplicationResource {
         commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId);
         commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression));
 
-        if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
-        if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount);
-        if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize);
-        if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+        if (!isEmpty(requestExpiration)) {
+            commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
+        }
+        if (!isEmpty(batchCount)) {
+            commSession.putHandshakeParam(BATCH_COUNT, batchCount);
+        }
+        if (!isEmpty(batchSize)) {
+            commSession.putHandshakeParam(BATCH_SIZE, batchSize);
+        }
+        if (!isEmpty(batchDuration)) {
+            commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+        }
 
         if(peerDescription.isSecure()){
-            NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
+            final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
             logger.debug("initiating peer, nifiUser={}", nifiUser);
             commSession.setUserDn(nifiUser.getIdentity());
         }
 
         // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
-        String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
-        String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+        final String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
+        final String clusterUrl = "nifi://localhost:" + req.getLocalPort();
         return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
     }
 
@@ -771,7 +781,7 @@ public class DataTransferResource extends ApplicationResource {
         try {
             // Do handshake
             initiateServerProtocol(peer, transportProtocolVersion);
-            transactionManager.extendsTransaction(transactionId);
+            transactionManager.extendTransaction(transactionId);
 
             final TransactionResultEntity entity = new TransactionResultEntity();
             entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());