You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/21 01:08:46 UTC

incubator-nifi git commit: NIFI-282: Refactoring to make client from site-to-site components

Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client a6293e340 -> c174d3a60


NIFI-282: Refactoring to make client from site-to-site components


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c174d3a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c174d3a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c174d3a6

Branch: refs/heads/site-to-site-client
Commit: c174d3a600358ebed8b8064247785606af6c6134
Parents: a6293e3
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jan 20 19:07:18 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Jan 20 19:07:18 2015 -0500

----------------------------------------------------------------------
 nifi/commons/site-to-site-client/pom.xml        |  13 +
 .../apache/nifi/remote/client/DataPacket.java   |  28 --
 .../nifi/remote/client/SiteToSiteClient.java    |   5 +-
 .../apache/nifi/remote/client/Transaction.java  |  21 ++
 .../socket/EndpointConnectionStatePool.java     | 309 +++++++++++++------
 .../nifi/remote/client/socket/SocketClient.java | 151 ++++++++-
 .../nifi/remote/protocol/ClientProtocol.java    |  15 +
 .../apache/nifi/remote/protocol/DataPacket.java |  29 ++
 .../protocol/socket/SocketClientProtocol.java   |  73 ++++-
 .../socket/SocketClientTransaction.java         |  66 ++++
 .../nifi/remote/util/RemoteNiFiUtils.java       | 216 +++++++++++++
 .../apache/nifi/groups/RemoteProcessGroup.java  |  30 --
 .../nifi/remote/StandardRemoteProcessGroup.java |  89 +-----
 .../util/RemoteProcessGroupUtils.java           | 216 -------------
 .../nifi/remote/RemoteResourceFactory.java      |   8 +
 .../nifi/remote/StandardRemoteGroupPort.java    |  28 +-
 .../socket/SocketFlowFileServerProtocol.java    |   9 +-
 .../apache/nifi/remote/RemoteDestination.java   |  10 -
 18 files changed, 822 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/pom.xml b/nifi/commons/site-to-site-client/pom.xml
index 7719d55..d65f440 100644
--- a/nifi/commons/site-to-site-client/pom.xml
+++ b/nifi/commons/site-to-site-client/pom.xml
@@ -21,6 +21,19 @@
   		<groupId>org.apache.nifi</groupId>
   		<artifactId>nifi-utils</artifactId>
   	</dependency>
+	<dependency>
+		<groupId>com.sun.jersey</groupId>
+		<artifactId>jersey-client</artifactId>
+	</dependency>
+	<dependency>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>client-dto</artifactId>
+		<version>0.0.1-incubating-SNAPSHOT</version>
+	</dependency>
+  	<dependency>
+  		<groupId>org.apache.nifi</groupId>
+  		<artifactId>nifi-web-utils</artifactId>
+  	</dependency>
   	
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
deleted file mode 100644
index ec77f2c..0000000
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-import java.io.InputStream;
-import java.util.Map;
-
-public interface DataPacket {
-
-	Map<String, String> getAttributes();
-	
-	InputStream getData();
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 47a09be..34cb56a 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.remote.client;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-public interface SiteToSiteClient {
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClient extends Closeable {
 
 	void send(DataPacket dataPacket) throws IOException;
 	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
new file mode 100644
index 0000000..bae6e51
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+public interface Transaction {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index d20fb58..0718bb1 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -41,10 +41,16 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
@@ -72,21 +78,28 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EndpointConnectionStatePool {
     public static final long PEER_REFRESH_PERIOD = 60000L;
     public static final String CATEGORY = "Site-to-Site";
+    public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
     private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
 	private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class);
 	
-	private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>();
+	private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>();
     private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
-
+    private final URI clusterUrl;
+    private final String apiUri;
+    
     private final AtomicLong peerIndex = new AtomicLong(0L);
     
     private final ReentrantLock peerRefreshLock = new ReentrantLock();
@@ -98,15 +111,41 @@ public class EndpointConnectionStatePool {
     private final File peersFile;
     private final EventReporter eventReporter;
     private final SSLContext sslContext;
+    private final ScheduledExecutorService taskExecutor;
+    
+    private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
+    private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
+    private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
+    private Integer siteToSitePort;
+    private Boolean siteToSiteSecure;
+    private long remoteRefreshTime;
+    private final Map<String, String> inputPortMap = new HashMap<>();	// map input port name to identifier
+    private final Map<String, String> outputPortMap = new HashMap<>();	// map output port name to identifier
+    
+    private volatile int commsTimeout;
 
-    public EndpointConnectionStatePool(final EventReporter eventReporter, final File persistenceFile) {
-    	this(null, eventReporter, persistenceFile);
+    public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
+    	this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
     }
     
-    public EndpointConnectionStatePool(final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+    public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+    	try {
+    		this.clusterUrl = new URI(clusterUrl);
+    	} catch (final URISyntaxException e) {
+    		throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl);
+    	}
+    	
+    	// Trim the trailing /
+        String uriPath = this.clusterUrl.getPath();
+        if (uriPath.endsWith("/")) {
+            uriPath = uriPath.substring(0, uriPath.length() - 1);
+        }
+        apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
+        
     	this.sslContext = sslContext;
     	this.peersFile = persistenceFile;
     	this.eventReporter = eventReporter;
+    	this.commsTimeout = commsTimeoutMillis;
     	
     	Set<PeerStatus> recoveredStatuses;
     	if ( persistenceFile != null && persistenceFile.exists() ) {
@@ -119,21 +158,39 @@ public class EndpointConnectionStatePool {
     	} else {
     		peerStatusCache = null;
     	}
+
+    	// Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
+    	// connections and keep our list of peers up-to-date.
+    	taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+    		private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+    		
+			@Override
+			public Thread newThread(final Runnable r) {
+				final Thread thread = defaultFactory.newThread(r);
+				thread.setName("NiFi Site-to-Site Connection Pool Maintenance");
+				return thread;
+			}
+    	});
+
+    	taskExecutor.scheduleWithFixedDelay(new Runnable() {
+			@Override
+			public void run() {
+				refreshPeers();
+			}
+    	}, 0, 5, TimeUnit.SECONDS);
+
+    	taskExecutor.scheduleWithFixedDelay(new Runnable() {
+			@Override
+			public void run() {
+				cleanupExpiredSockets();
+			}
+    	}, 5, 5, TimeUnit.SECONDS);
     }
     
-    public EndpointConnectionState getEndpointConnectionState(final String clusterUrl, final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+    public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
     	//
         // Attempt to get a connection state that already exists for this URL.
         //
-        BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(clusterUrl);
-        if ( connectionStateQueue == null ) {
-            connectionStateQueue = new LinkedBlockingQueue<>();
-            BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue);
-            if ( existingQueue != null ) {
-                connectionStateQueue = existingQueue;
-            }
-        }
-        
         FlowFileCodec codec = null;
         CommunicationsSession commsSession = null;
         SocketClientProtocol protocol = null;
@@ -172,7 +229,7 @@ public class EndpointConnectionStatePool {
                 
                 
                 final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
-                peer = new Peer(commsSession, peerUrl, clusterUrl);
+                peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
                 
                 // perform handshake
                 try {
@@ -214,9 +271,8 @@ public class EndpointConnectionStatePool {
             } else {
                 final long lastTimeUsed = connectionState.getLastTimeUsed();
                 final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
-                final long timeoutMillis = remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
                 
-                if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) {
+                if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
                     cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
                     connectionState = null;
                 } else {
@@ -243,12 +299,7 @@ public class EndpointConnectionStatePool {
     		return false;
     	}
     	
-    	final BlockingQueue<EndpointConnectionState> queue = endpointConnectionMap.get(url);
-    	if ( queue == null ) {
-    		return false;
-    	}
-    	
-    	return queue.offer(endpointConnectionState);
+    	return connectionStateQueue.offer(endpointConnectionState);
     }
     
     /**
@@ -365,7 +416,7 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public Set<PeerStatus> getPeerStatuses() {
+    private Set<PeerStatus> getPeerStatuses() {
         final PeerStatusCache cache = this.peerStatusCache;
         if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
             return null;
@@ -384,12 +435,12 @@ public class EndpointConnectionStatePool {
         return cache.getStatuses();
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, final boolean secure) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
-    	final String hostname = destinationUri.getHost();
-        final int port = destinationUri.getPort();
+    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
+    	final String hostname = clusterUrl.getHost();
+        final int port = getSiteToSitePort();
     	
-    	final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port, secure);
-        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, destinationUri.toString());
+    	final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
+        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
         final SocketClientProtocol clientProtocol = new SocketClientProtocol();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
@@ -399,8 +450,8 @@ public class EndpointConnectionStatePool {
             throw new BadRequestException(e.toString());
         }
 
-        // TODO: Make the 30000 millis configurable
-        clientProtocol.handshake(peer, null, 30000);
+        clientProtocol.setTimeout(commsTimeout);
+        clientProtocol.handshake(peer, null);
         final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
         persistPeerStatuses(peerStatuses);
 
@@ -474,38 +525,41 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
-    	return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort(), peerStatus.isSecure());
+    private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
+    	return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort());
     }
     
-    public CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port, final boolean secure) throws IOException {
+    private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
+    	if ( siteToSiteSecure == null ) {
+    		throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
+    	}
+    	
         final String destinationUri = "nifi://" + hostname + ":" + port;
 
         CommunicationsSession commsSession = null;
         try {
-        if ( secure ) {
-            if ( sslContext == null ) {
-                throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
-            }
-            
-            final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
-            socketChannel.connect();
-    
-            commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-        } else {
-            final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
-            commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
-        }
-
-        commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-        commsSession.setUri(destinationUri);
+	        if ( siteToSiteSecure ) {
+	            if ( sslContext == null ) {
+	                throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
+	            }
+	            
+	            final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+	            socketChannel.connect();
+	    
+	            commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+	                
+	                try {
+	                    commsSession.setUserDn(socketChannel.getDn());
+	                } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
+	                    throw new IOException(ex);
+	                }
+	        } else {
+	            final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+	            commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
+	        }
+	
+	        commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+	        commsSession.setUri(destinationUri);
         } catch (final IOException ioe) {
             if ( commsSession != null ) {
                 commsSession.close();
@@ -578,59 +632,52 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public void cleanupExpiredSockets() {
+    private void cleanupExpiredSockets() {
         final List<EndpointConnectionState> states = new ArrayList<>();
         
-        for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
-            states.clear();
-            
-            EndpointConnectionState state;
-            while ((state = queue.poll()) != null) {
-                // If the socket has not been used in 10 seconds, shut it down.
-                final long lastUsed = state.getLastTimeUsed();
-                if ( lastUsed < System.currentTimeMillis() - 10000L ) {
-                    try {
-                        state.getSocketClientProtocol().shutdown(state.getPeer());
-                    } catch (final Exception e) {
-                        logger.debug("Failed to shut down {} using {} due to {}", 
-                            new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
-                    }
-                    
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                } else {
-                    states.add(state);
+        EndpointConnectionState state;
+        while ((state = connectionStateQueue.poll()) != null) {
+            // If the socket has not been used in 10 seconds, shut it down.
+            final long lastUsed = state.getLastTimeUsed();
+            if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+                try {
+                    state.getSocketClientProtocol().shutdown(state.getPeer());
+                } catch (final Exception e) {
+                    logger.debug("Failed to shut down {} using {} due to {}", 
+                        new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
                 }
+                
+                cleanup(state.getSocketClientProtocol(), state.getPeer());
+            } else {
+                states.add(state);
             }
-            
-            queue.addAll(states);
         }
+        
+        connectionStateQueue.addAll(states);
     }
     
     public void shutdown() {
+    	taskExecutor.shutdown();
     	peerTimeoutExpirations.clear();
             
         for ( final CommunicationsSession commsSession : activeCommsChannels ) {
             commsSession.interrupt();
         }
         
-        for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
-            EndpointConnectionState state;
-            while ( (state = queue.poll()) != null)  {
-                cleanup(state.getSocketClientProtocol(), state.getPeer());
-            }
+        EndpointConnectionState state;
+        while ( (state = connectionStateQueue.poll()) != null)  {
+            cleanup(state.getSocketClientProtocol(), state.getPeer());
         }
-        
-        endpointConnectionMap.clear();
     }
     
-    public void refreshPeers(final URI targetUri, final boolean secure) {
+    private void refreshPeers() {
         final PeerStatusCache existingCache = peerStatusCache;
         if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
             return;
         }
 
         try {
-            final Set<PeerStatus> statuses = fetchRemotePeerStatuses(targetUri, secure);
+            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
             peerStatusCache = new PeerStatusCache(statuses);
             logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
         } catch (Exception e) {
@@ -639,6 +686,92 @@ public class EndpointConnectionStatePool {
                 logger.warn("", e);
             }
         }
+    }
+    
+    
+    public String getInputPortIdentifier(final String portName) throws IOException {
+        return getPortIdentifier(portName, inputPortMap);
+    }
+    
+    public String getOutputPortIdentifier(final String portName) throws IOException {
+    	return getPortIdentifier(portName, outputPortMap);
+    }
+    
+    
+    private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException {
+    	String identifier;
+    	remoteInfoReadLock.lock();
+        try {
+        	identifier = portMap.get(portName);
+        } finally {
+        	remoteInfoReadLock.unlock();
+        }
+        
+        if ( identifier != null ) {
+        	return identifier;
+        }
+        
+        refreshRemoteInfo();
+
+    	remoteInfoReadLock.lock();
+        try {
+        	return portMap.get(portName);
+        } finally {
+        	remoteInfoReadLock.unlock();
+        }
+    }
+    
+    
+    private ControllerDTO refreshRemoteInfo() throws IOException {
+    	final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
+        final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null);
+		final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
+        
+        remoteInfoWriteLock.lock();
+        try {
+            this.siteToSitePort = controller.getRemoteSiteListeningPort();
+            this.siteToSiteSecure = controller.isSiteToSiteSecure();
+            
+            inputPortMap.clear();
+            for (final PortDTO inputPort : controller.getInputPorts()) {
+            	inputPortMap.put(inputPort.getName(), inputPort.getId());
+            }
+            
+            outputPortMap.clear();
+            for ( final PortDTO outputPort : controller.getOutputPorts()) {
+            	outputPortMap.put(outputPort.getName(), outputPort.getId());
+            }
+            
+            this.remoteRefreshTime = System.currentTimeMillis();
+        } finally {
+        	remoteInfoWriteLock.unlock();
+        }
+        
+        return controller;
+    }
+    
+    /**
+     * @return the port that the remote instance is listening on for
+     * site-to-site communication, or <code>null</code> if the remote instance
+     * is not configured to allow site-to-site communications.
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    private Integer getSiteToSitePort() throws IOException {
+        Integer listeningPort;
+        remoteInfoReadLock.lock();
+        try {
+            listeningPort = this.siteToSitePort;
+            if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+                return listeningPort;
+            }
+        } finally {
+        	remoteInfoReadLock.unlock();
+        }
+
+        final ControllerDTO controller = refreshRemoteInfo();
+        listeningPort = controller.getRemoteSiteListeningPort();
 
+        return listeningPort;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 48e9cc5..b81b425 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -16,16 +16,87 @@
  */
 package org.apache.nifi.remote.client.socket;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.remote.client.DataPacket;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 public class SocketClient implements SiteToSiteClient {
-
+	private final EndpointConnectionStatePool pool;
+	private final boolean compress;
+	private final String portName;
+	private final long penalizationNanos;
+	private volatile String portIdentifier;
+	
+	private SocketClient(final Builder builder) {
+		pool = new EndpointConnectionStatePool(builder.url, (int) TimeUnit.NANOSECONDS.toMillis(builder.timeoutNanos), 
+				builder.sslContext, builder.eventReporter, builder.peerPersistenceFile);
+		
+		this.compress = builder.useCompression;
+		this.portIdentifier = builder.portIdentifier;
+		this.portName = builder.portName;
+		this.penalizationNanos = builder.penalizationNanos;
+	}
+	
+	
+	private String getPortIdentifier(final TransferDirection direction) throws IOException {
+		final String id = this.portIdentifier;
+		if ( id != null ) {
+			return id;
+		}
+		
+		if ( direction == TransferDirection.SEND ) {
+			return pool.getInputPortIdentifier(this.portName);
+		} else {
+			return pool.getOutputPortIdentifier(this.portName);
+		}
+	}
+	
+	
 	@Override
 	public void send(final DataPacket dataPacket) throws IOException {
-		// TODO Auto-generated method stub
+		final String portId = getPortIdentifier(TransferDirection.SEND);
+		
+		if ( portId == null ) {
+			throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance");
+		}
+		
+		final RemoteDestination remoteDestination = new RemoteDestination() {
+			@Override
+			public String getIdentifier() {
+				return portId;
+			}
+
+			@Override
+			public long getYieldPeriod(final TimeUnit timeUnit) {
+				return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+			}
+
+			@Override
+			public boolean isUseCompression() {
+				return compress;
+			}
+		};
+		
+		final EndpointConnectionState connectionState;
+		try {
+			connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+		} catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
+			throw new IOException(e);
+		}
+		
+		
 	}
 
 	@Override
@@ -33,5 +104,79 @@ public class SocketClient implements SiteToSiteClient {
 		// TODO Auto-generated method stub
 		return null;
 	}
+	
+	@Override
+	public void close() throws IOException {
+		pool.shutdown();
+	}
 
+	
+	public static class Builder {
+		private String url;
+		private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+		private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+		private SSLContext sslContext;
+		private EventReporter eventReporter;
+		private File peerPersistenceFile;
+		private boolean useCompression;
+		private String portName;
+		private String portIdentifier;
+		
+		public Builder url(final String url) {
+			this.url = url;
+			return this;
+		}
+		
+		public Builder timeout(final long timeout, final TimeUnit unit) {
+			this.timeoutNanos = unit.toNanos(timeout);
+			return this;
+		}
+		
+		public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
+			this.penalizationNanos = unit.toNanos(period);
+			return this;
+		}
+		
+		public Builder sslContext(final SSLContext sslContext) {
+			this.sslContext = sslContext;
+			return this;
+		}
+		
+		public Builder eventReporter(final EventReporter eventReporter) {
+			this.eventReporter = eventReporter;
+			return this;
+		}
+		
+		public Builder peerPersistenceFile(final File peerPersistenceFile) {
+			this.peerPersistenceFile = peerPersistenceFile;
+			return this;
+		}
+		
+		public Builder useCompression(final boolean compress) {
+			this.useCompression = compress;
+			return this;
+		}
+		
+		public Builder portName(final String portName) {
+			this.portName = portName;
+			return this;
+		}
+		
+		public Builder portIdentifier(final String portIdentifier) {
+			this.portIdentifier = portIdentifier;
+			return this;
+		}
+		
+		public SocketClient build() {
+			if ( url == null ) {
+				throw new IllegalStateException("Must specify URL to build Site-to-Site client");
+			}
+			
+			if ( portName == null && portIdentifier == null ) {
+				throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+			}
+			
+			return new SocketClient(this);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 32274eb..d817425 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -46,6 +47,20 @@ public interface ClientProtocol extends VersionedRemoteResource {
 
     boolean isReadyForFileTransfer();
 
+    
+    
+    
+    void startTransaction(Peer peer, TransferDirection direction) throws IOException;
+    
+    void completeTransaction();
+    
+    void rollbackTransaction();
+    
+    void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
+    
+    DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, ProtocolException;
+    
+    
     /**
      * returns <code>true</code> if remote instance indicates that the port is
      * invalid

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+	Map<String, String> getAttributes();
+	
+	InputStream getData();
+	
+	long getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 560385c..6b0c94b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -41,7 +41,9 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.Transaction;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -50,6 +52,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
@@ -60,7 +63,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
 
     private RemoteDestination destination;
-    private boolean useCompression;
+    private boolean useCompression = false;
     
     private String commsIdentifier;
     private boolean handshakeComplete = false;
@@ -70,6 +73,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private Response handshakeResponse = null;
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
+    private int timeoutMillis = 30000;
     
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     
@@ -81,13 +85,16 @@ public class SocketClientProtocol implements ClientProtocol {
         this.useCompression = destination.isUseCompression();
     }
     
+    public void setTimeout(final int timeoutMillis) {
+    	this.timeoutMillis = timeoutMillis;
+    }
     
     @Override
     public void handshake(final Peer peer) throws IOException, HandshakeException {
-    	handshake(peer, destination.getIdentifier(), (int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+    	handshake(peer, destination.getIdentifier());
     }
     
-    public void handshake(final Peer peer, final String destinationId, final int timeoutMillis) throws IOException, HandshakeException {
+    public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException {
         if ( handshakeComplete ) {
             throw new IllegalStateException("Handshake has already been completed");
         }
@@ -228,6 +235,65 @@ public class SocketClientProtocol implements ClientProtocol {
         return codec;
     }
 
+
+    // TODO: move up to top with member variables
+    private SocketClientTransaction transaction;
+    
+    @Override
+    public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been performed");
+        }
+        if ( !readyForFileTransfer ) {
+            throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
+        }
+        
+        transaction = new SocketClientTransaction(peer, direction, useCompression);
+
+        final DataOutputStream dos = transaction.getDataOutputStream();
+        if ( direction == TransferDirection.RECEIVE ) {
+            // Indicate that we would like to have some data
+            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        } else {
+            // Indicate that we would like to have some data
+            RequestType.SEND_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        }
+    }
+    
+    @Override
+    public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException {
+    	if ( transaction == null ) {
+    		throw new IllegalStateException("Cannot receive data because no transaction has been started");
+    	}
+    	
+    	final Peer peer = transaction.getPeer();
+        logger.debug("{} Receiving FlowFiles from {}", this, peer);
+        final CommunicationsSession commsSession = peer.getCommunicationsSession();
+        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+        String userDn = commsSession.getUserDn();
+        if ( userDn == null ) {
+            userDn = "none";
+        }
+        
+        // Determine if Peer will send us data or has no data to send us
+        final Response dataAvailableCode = Response.read(dis);
+        switch (dataAvailableCode.getCode()) {
+            case MORE_DATA:
+                logger.debug("{} {} Indicates that data is available", this, peer);
+                break;
+            case NO_MORE_DATA:
+                logger.debug("{} No data available from {}", peer);
+                return null;
+            default:
+                throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+        }
+        
+        
+    }
+    
     
     @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
@@ -258,6 +324,7 @@ public class SocketClientProtocol implements ClientProtocol {
                 logger.debug("{} {} Indicates that data is available", this, peer);
                 break;
             case NO_MORE_DATA:
+            	context.yield();
                 logger.debug("{} No data available from {}", peer);
                 return;
             default:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..0c4ce05
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.Transaction;
+import org.apache.nifi.remote.io.CompressionInputStream;
+
+public class SocketClientTransaction implements Transaction {
+	private final long startTime = System.nanoTime();
+	private long bytesReceived = 0L;
+	private CRC32 crc = new CRC32();
+	
+	private final Peer peer;
+	private final TransferDirection direction;
+	
+	private final DataInputStream dis;
+	private final DataOutputStream dos;
+	private final CheckedInputStream checkedInputStream;
+	
+	SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
+		this.peer = peer;
+		this.direction = direction;
+		
+		this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+		this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+		
+		final InputStream dataInputStream = useCompression ? new CompressionInputStream(dis) : dis;
+        checkedInputStream = new CheckedInputStream(dataInputStream, crc);
+	}
+	
+	CheckedInputStream getCheckedInputStream() {
+		return checkedInputStream;
+	}
+	
+	DataOutputStream getDataOutputStream() {
+		return dos;
+	}
+	
+	Peer getPeer() {
+		return peer;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+    public static final String CONTROLLER_URI_PATH = "/controller";
+
+    private static final int CONNECT_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    
+    private final Client client;
+    
+    public RemoteNiFiUtils(final SSLContext sslContext) {
+        this.client = getClient(sslContext);
+    }
+    
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+        return get(uri, timeoutMillis, null);
+    }
+    
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @param queryParams 
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if ( queryParams != null ) {
+            for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
+                webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
+            }
+        }
+
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+
+        return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
+        return webResource.head();
+    }
+
+    /**
+     * Gets a client based on the specified URI.
+     * 
+     * @param uri
+     * @return 
+     */
+    private Client getClient(final SSLContext sslContext) {
+        final Client client;
+        if (sslContext == null) {
+            client = WebUtils.createClient(null);
+        } else {
+            client = WebUtils.createClient(null, sslContext);
+        }
+
+        client.setReadTimeout(READ_TIMEOUT);
+        client.setConnectTimeout(CONNECT_TIMEOUT);
+
+        return client;
+    }
+    
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+     * @param timeoutMillis
+     * @return
+     * @throws IOException
+     */
+    public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+    	try {
+			final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+			return getRemoteListeningPort(uriObject, timeoutMillis);
+		} catch (URISyntaxException e) {
+			throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+		}
+    }
+    
+    public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getRemoteRootGroupId(uriObject, timeoutMillis);
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+        }
+    }
+    
+    public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getController(uriObject, timeoutMillis).getInstanceId();
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+        }
+    }
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the full URI to fetch, including the path.
+     * @return
+     * @throws IOException
+     */
+    private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+    	return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+    }
+    
+    private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+        return getController(uri, timeoutMillis).getId();
+    }
+    
+    public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+        final ClientResponse response = get(uri, timeoutMillis);
+        
+        if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+            final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+            return entity.getController();
+        } else {
+            final String responseMessage = response.getEntity(String.class);
+            throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+        }
+    }
+    
+    /**
+     * Issues a registration request on behalf of the current user.
+     * 
+     * @param baseApiUri 
+     * @return  
+     */
+    public ClientResponse issueRegistrationRequest(String baseApiUri) {
+        final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+        // set up the query params
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+        
+        // create the web resource
+        WebResource webResource = client.resource(uri);
+        
+        // get the client utils and make the request
+        return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 9f2dac8..ac41cba 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.groups;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Date;
 import java.util.Set;
@@ -109,15 +108,6 @@ public interface RemoteProcessGroup {
     String getCommunicationsTimeout();
 
     /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    Integer getListeningPort() throws IOException;
-
-    /**
      * Indicates whether or not the RemoteProcessGroup is currently scheduled to
      * transmit data
      *
@@ -229,24 +219,4 @@ public interface RemoteProcessGroup {
     void verifyCanStopTransmitting();
 
     void verifyCanUpdate();
-
-    /**
-     * Returns a set of PeerStatus objects that describe the different peers
-     * that we can communicate with for this RemoteProcessGroup.
-     *
-     * If the destination is a cluster, this set will contain PeerStatuses for
-     * each of the nodes in the cluster.
-     *
-     * If the destination is a standalone instance, this set will contain just a
-     * PeerStatus for the destination.
-     *
-     * Once the PeerStatuses have been obtained, they may be cached by this
-     * RemoteProcessGroup for some amount of time.
-     *
-     * If unable to obtain the PeerStatuses or no peer status has yet been
-     * obtained, will return null.
-     *
-     * @return
-     */
-    Set<PeerStatus> getPeerStatuses();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 857add9..db0aeb7 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -19,7 +19,6 @@ package org.apache.nifi.remote;
 import static java.util.Objects.requireNonNull;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -50,7 +49,6 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.util.RemoteProcessGroupUtils;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
@@ -59,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
@@ -85,7 +84,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     public static final String CONTROLLER_URI_PATH = "/controller";
     public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
-    public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
 
     // status codes
     public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
@@ -127,9 +125,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 0, 0, 0);
     private Long refreshContentsTimestamp = null;
-    private Integer listeningPort;
-    private long listeningPortRetrievalTime = 0L;
     private Boolean destinationSecure;
+    private Integer listeningPort;
 
     private volatile String authorizationIssue;
 
@@ -175,48 +172,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             }
         };
 
-        endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile());
+        endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
+        		sslContext, eventReporter, getPeerPersistenceFile());
         
-        final Runnable socketCleanup = new Runnable() {
-            @Override
-            public void run() {
-                final Set<StandardRemoteGroupPort> ports = new HashSet<>();
-                readLock.lock();
-                try {
-                    ports.addAll(inputPorts.values());
-                    ports.addAll(outputPorts.values());
-                } finally {
-                    readLock.unlock();
-                }
-
-                endpointConnectionPool.cleanupExpiredSockets();
-            }
-        };
-
-        final Runnable refreshPeers = new Runnable() {
-            @Override
-            public void run() {
-            	final boolean secure;
-            	try {
-            		secure = isSecure();
-				} catch (CommunicationsException e) {
-					logger.warn("{} Unable to determine if remote instance {} is configured for secure site-to-site due to {}; will not refresh list of peers", new Object[] {this, getTargetUri(), e.toString()});
-					if ( logger.isDebugEnabled() ) {
-						logger.warn("", e);
-					}
-					return;
-				}
-            	
-            	endpointConnectionPool.refreshPeers(getTargetUri(), secure);
-            }
-        };
-
         final Runnable checkAuthorizations = new InitializationTask();
 
         backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
         backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(refreshPeers, 0, 5, TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(socketCleanup, 10L, 10L, TimeUnit.SECONDS);
     }
 
     @Override
@@ -810,7 +772,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             return;
         }
 
-        final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+        final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
         final String uriVal = apiUri.toString() + CONTROLLER_URI_PATH;
         URI uri;
         try {
@@ -950,39 +912,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         return descriptor;
     }
 
-    /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    @Override
-    public Integer getListeningPort() throws IOException {
-        Integer listeningPort;
-        readLock.lock();
-        try {
-            listeningPort = this.listeningPort;
-            if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
-                return listeningPort;
-            }
-        } finally {
-            readLock.unlock();
-        }
-
-        final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
-        listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-        writeLock.lock();
-        try {
-            this.listeningPort = listeningPort;
-            this.listeningPortRetrievalTime = System.currentTimeMillis();
-        } finally {
-            writeLock.unlock();
-        }
-
-        return listeningPort;
-    }
-
     @Override
     public boolean isTransmitting() {
         return transmitting.get();
@@ -1218,7 +1147,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         @Override
         public void run() {
             try {
-                final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+                final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
                 final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
                 
                 final int statusCode = response.getStatus();
@@ -1398,12 +1327,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         }
     }
 
-    @Override
-    public Set<PeerStatus> getPeerStatuses() {
-    	return endpointConnectionPool.getPeerStatuses();
-    }
-
-
     private File getPeerPersistenceFile() {
         final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, getIdentifier() + ".peers");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
deleted file mode 100644
index 10208f8..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.util;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
-import org.apache.nifi.web.util.WebUtils;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.UniformInterfaceException;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
-/**
- *
- */
-public class RemoteProcessGroupUtils {
-
-    public static final String CONTROLLER_URI_PATH = "/controller";
-
-    private static final int CONNECT_TIMEOUT = 10000;
-    private static final int READ_TIMEOUT = 10000;
-    
-    private final Client client;
-    
-    public RemoteProcessGroupUtils(final SSLContext sslContext) {
-        this.client = getClient(sslContext);
-    }
-    
-
-    /**
-     * Gets the content at the specified URI.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
-        return get(uri, timeoutMillis, null);
-    }
-    
-    /**
-     * Gets the content at the specified URI using the given query parameters.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @param queryParams 
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
-        // perform the request
-        WebResource webResource = client.resource(uri);
-        if ( queryParams != null ) {
-            for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) {
-                webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
-            }
-        }
-
-        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
-        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
-
-        return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
-    }
-
-    /**
-     * Performs a HEAD request to the specified URI.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
-        // perform the request
-        WebResource webResource = client.resource(uri);
-        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
-        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
-        return webResource.head();
-    }
-
-    /**
-     * Gets a client based on the specified URI.
-     * 
-     * @param uri
-     * @return 
-     */
-    private Client getClient(final SSLContext sslContext) {
-        final Client client;
-        if (sslContext == null) {
-            client = WebUtils.createClient(null);
-        } else {
-            client = WebUtils.createClient(null, sslContext);
-        }
-
-        client.setReadTimeout(READ_TIMEOUT);
-        client.setConnectTimeout(CONNECT_TIMEOUT);
-
-        return client;
-    }
-    
-    
-    /**
-     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
-     * is not configured to use Site-to-Site transfers.
-     * 
-     * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
-     * @param timeoutMillis
-     * @return
-     * @throws IOException
-     */
-    public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
-    	try {
-			final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
-			return getRemoteListeningPort(uriObject, timeoutMillis);
-		} catch (URISyntaxException e) {
-			throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
-		}
-    }
-    
-    public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
-        try {
-            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
-            return getRemoteRootGroupId(uriObject, timeoutMillis);
-        } catch (URISyntaxException e) {
-            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
-        }
-    }
-    
-    public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
-        try {
-            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
-            return getController(uriObject, timeoutMillis).getInstanceId();
-        } catch (URISyntaxException e) {
-            throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
-        }
-    }
-    
-    /**
-     * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance
-     * is not configured to use Site-to-Site transfers.
-     * 
-     * @param uri the full URI to fetch, including the path.
-     * @return
-     * @throws IOException
-     */
-    private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
-    	return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
-    }
-    
-    private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
-        return getController(uri, timeoutMillis).getId();
-    }
-    
-    private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
-        final ClientResponse response = get(uri, timeoutMillis);
-        
-        if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
-            final ControllerEntity entity = response.getEntity(ControllerEntity.class);
-            return entity.getController();
-        } else {
-            final String responseMessage = response.getEntity(String.class);
-            throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
-        }
-    }
-    
-    /**
-     * Issues a registration request on behalf of the current user.
-     * 
-     * @param baseApiUri 
-     * @return  
-     */
-    public ClientResponse issueRegistrationRequest(String baseApiUri) {
-        final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
-
-        // set up the query params
-        MultivaluedMapImpl entity = new MultivaluedMapImpl();
-        entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
-        
-        // create the web resource
-        WebResource webResource = client.resource(uri);
-        
-        // get the client utils and make the request
-        return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 922d4e7..2b27de2 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -56,6 +56,14 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
         }
 	}
 	
+	public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
+		dis.readUTF();	// read codec name
+		dis.readInt();	// read codec version
+		
+		dos.write(ABORT);
+		dos.writeUTF(explanation);
+		dos.flush();
+	}
 	
 	@SuppressWarnings("unchecked")
     public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 77ac1a9..82d8206 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.remote;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -33,7 +32,6 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -144,7 +142,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         final EndpointConnectionState connectionState;
         try {
-        	connectionState = connectionStatePool.getEndpointConnectionState(url, this, transferDirection);
+        	connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
@@ -366,28 +364,4 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     public boolean isSideEffectFree() {
         return false;
     }
-
-	@Override
-	public String getDescription() {
-		return toString();
-	}
-
-	@Override
-	public long getCommunicationsTimeout(final TimeUnit timeUnit) {
-		return getRemoteProcessGroup().getCommunicationsTimeout(timeUnit);
-	}
-
-	@Override
-	public URI getTargetUri() {
-		return remoteGroup.getTargetUri();
-	}
-	
-	@Override
-	public boolean isSecure() {
-		try {
-			return remoteGroup.isSecure();
-		} catch (final CommunicationsException ce) {
-			return false;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 647b45c..887429c 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -204,11 +204,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
             throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
         }
-        if ( port == null ) {
-            logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing");
-            ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name());
-            throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name());
-        }
         
         // send "OK" response
         if ( !responseWritten ) {
@@ -243,6 +238,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
         
+        if ( port == null ) {
+        	RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
+        }
+        
         // Negotiate the FlowFileCodec to use.
         try {
             negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 94de86b..8c972f7 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -16,21 +16,11 @@
  */
 package org.apache.nifi.remote;
 
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 public interface RemoteDestination {
-
-	String getDescription();
-	
 	String getIdentifier();
 	
-	URI getTargetUri();
-	
-	boolean isSecure();
-	
-	long getCommunicationsTimeout(TimeUnit timeUnit);
-	
 	long getYieldPeriod(TimeUnit timeUnit);
 	
 	boolean isUseCompression();