You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:05 UTC

[02/51] [abbrv] incubator-nifi git commit: NIFI-282: Bug fixes; documentation improvements; removed code marked as 'FOR TESTING'

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 2c1b085..db0aeb7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,21 +18,9 @@ package org.apache.nifi.remote;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -49,11 +37,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
 import javax.ws.rs.core.Response;
 
 import org.apache.nifi.connectable.ConnectableType;
@@ -64,7 +49,6 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.util.RemoteProcessGroupUtils;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
@@ -72,16 +56,8 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.remote.exception.BadRequestException;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
@@ -108,7 +84,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     public static final String CONTROLLER_URI_PATH = "/controller";
     public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
-    public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
 
     // status codes
     public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
@@ -150,15 +125,12 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 0, 0, 0);
     private Long refreshContentsTimestamp = null;
-    private Integer listeningPort;
-    private long listeningPortRetrievalTime = 0L;
     private Boolean destinationSecure;
+    private Integer listeningPort;
 
     private volatile String authorizationIssue;
 
-    private volatile PeerStatusCache peerStatusCache;
-    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
-
+    private final EndpointConnectionStatePool endpointConnectionPool;
     private final ScheduledExecutorService backgroundThreadExecutor;
 
     public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
@@ -200,72 +172,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             }
         };
 
-        final Runnable socketCleanup = new Runnable() {
-            @Override
-            public void run() {
-                final Set<StandardRemoteGroupPort> ports = new HashSet<>();
-                readLock.lock();
-                try {
-                    ports.addAll(inputPorts.values());
-                    ports.addAll(outputPorts.values());
-                } finally {
-                    readLock.unlock();
-                }
-
-                for (final StandardRemoteGroupPort port : ports) {
-                    port.cleanupSockets();
-                }
-            }
-        };
-
-        try {
-            final File peersFile = getPeerPersistenceFile();
-            this.peerStatusCache = new PeerStatusCache(recoverPersistedPeerStatuses(peersFile), peersFile.lastModified());
-        } catch (final IOException e) {
-            logger.error("{} Failed to recover persisted Peer Statuses due to {}", this, e);
-        }
-
-        final Runnable refreshPeers = new Runnable() {
-            @Override
-            public void run() {
-                final PeerStatusCache existingCache = peerStatusCache;
-                if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
-                    return;
-                }
-
-                Set<RemoteGroupPort> ports = getInputPorts();
-                if (ports.isEmpty()) {
-                    ports = getOutputPorts();
-                }
-                
-                if (ports.isEmpty()){
-                    return;
-                }
-
-                // it doesn't really matter which port we use. Since we are just getting the Peer Status,
-                // if the server indicates that the port cannot receive data for whatever reason, we will
-                // simply ignore the error.
-                final RemoteGroupPort port = ports.iterator().next();
-
-                try {
-                    final Set<PeerStatus> statuses = fetchRemotePeerStatuses(port);
-                    peerStatusCache = new PeerStatusCache(statuses);
-                    logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", StandardRemoteProcessGroup.this, statuses.size());
-                } catch (Exception e) {
-                    logger.warn("{} Unable to refresh Remote Group's peers due to {}", StandardRemoteProcessGroup.this, e);
-                    if (logger.isDebugEnabled()) {
-                        logger.warn("", e);
-                    }
-                }
-            }
-        };
-
+        endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
+        		sslContext, eventReporter, getPeerPersistenceFile());
+        
         final Runnable checkAuthorizations = new InitializationTask();
 
         backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
         backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(refreshPeers, 0, 5, TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(socketCleanup, 10L, 10L, TimeUnit.SECONDS);
     }
 
     @Override
@@ -287,6 +200,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     @Override
     public void shutdown() {
         backgroundThreadExecutor.shutdown();
+        endpointConnectionPool.shutdown();
     }
     
     @Override
@@ -858,7 +772,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             return;
         }
 
-        final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+        final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
         final String uriVal = apiUri.toString() + CONTROLLER_URI_PATH;
         URI uri;
         try {
@@ -998,39 +912,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         return descriptor;
     }
 
-    /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    @Override
-    public Integer getListeningPort() throws IOException {
-        Integer listeningPort;
-        readLock.lock();
-        try {
-            listeningPort = this.listeningPort;
-            if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
-                return listeningPort;
-            }
-        } finally {
-            readLock.unlock();
-        }
-
-        final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
-        listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-        writeLock.lock();
-        try {
-            this.listeningPort = listeningPort;
-            this.listeningPortRetrievalTime = System.currentTimeMillis();
-        } finally {
-            writeLock.unlock();
-        }
-
-        return listeningPort;
-    }
-
     @Override
     public boolean isTransmitting() {
         return transmitting.get();
@@ -1255,52 +1136,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         }
     }
 
-    @Override
-    public CommunicationsSession establishSiteToSiteConnection() throws IOException {
-        final URI uri = apiUri;
-        final String destinationUri = uri.toString();
-        CommunicationsSession commsSession = null;
-        try {
-            if (isSecure()) {
-                if (sslContext == null) {
-                    throw new IOException("Unable to communicate with " + getTargetUri() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
-                }
-
-                final Integer listeningPort = getListeningPort();
-                if (listeningPort == null) {
-                    throw new IOException("Remote instance is not configured to allow incoming Site-to-Site connections");
-                }
-
-                final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, uri.getHost(), listeningPort, true);
-                socketChannel.connect();
-                commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-            } else {
-                final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(uri.getHost(), getListeningPort()));
-
-                commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
-            }
-
-            commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-            commsSession.setUri("nifi://" + uri.getHost() + ":" + uri.getPort());
-        } catch (final IOException e) {
-            if (commsSession != null) {
-                try {
-                    commsSession.close();
-                } catch (final IOException ignore) {
-                }
-            }
-
-            throw e;
-        }
-        return commsSession;
-    }
 
     @Override
     public EventReporter getEventReporter() {
@@ -1312,7 +1147,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         @Override
         public void run() {
             try {
-                final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+                final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
                 final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
                 
                 final int statusCode = response.getStatus();
@@ -1385,6 +1220,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     public String getYieldDuration() {
         return yieldDuration;
     }
+    
+    @Override
+    public EndpointConnectionStatePool getConnectionPool() {
+        return endpointConnectionPool;
+    }
 
     @Override
     public void verifyCanDelete() {
@@ -1487,135 +1327,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         }
     }
 
-    @Override
-    public Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
-                equalizedSet.add(equalizedStatus);
-            }
-
-            return equalizedSet;
-        }
-
-        return cache.getStatuses();
-    }
-
-    private Set<PeerStatus> fetchRemotePeerStatuses(final RemoteGroupPort port) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException {
-        final CommunicationsSession commsSession = establishSiteToSiteConnection();
-        final Peer peer = new Peer(commsSession, "nifi://" + getTargetUri().getHost() + ":" + getListeningPort());
-        final SocketClientProtocol clientProtocol = new SocketClientProtocol();
-        clientProtocol.setPort(port);
-        final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        try {
-            RemoteResourceFactory.initiateResourceNegotiation(clientProtocol, dis, dos);
-        } catch (final HandshakeException e) {
-            throw new BadRequestException(e.toString());
-        }
-
-        clientProtocol.handshake(peer);
-        final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer);
-        persistPeerStatuses(peerStatuses);
-
-        try {
-            clientProtocol.shutdown(peer);
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-            getEventReporter().reportEvent(Severity.WARNING, "Site to Site", message);
-        }
-
-        try {
-            peer.close();
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-            getEventReporter().reportEvent(Severity.WARNING, "Site to Site", message);
-        }
-
-        return peerStatuses;
-    }
-
     private File getPeerPersistenceFile() {
         final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, getIdentifier() + ".peers");
     }
 
-    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-        final File peersFile = getPeerPersistenceFile();
-        try (final OutputStream fos = new FileOutputStream(peersFile);
-                final OutputStream out = new BufferedOutputStream(fos)) {
-
-            for (final PeerStatus status : statuses) {
-                final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n";
-                out.write(line.getBytes(StandardCharsets.UTF_8));
-            }
-
-        } catch (final IOException e) {
-            logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
-        }
-    }
-
-    private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
-        if (!file.exists()) {
-            return null;
-        }
-
-        final Set<PeerStatus> statuses = new HashSet<>();
-        try (final InputStream fis = new FileInputStream(file);
-                final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
-
-            String line;
-            while ((line = reader.readLine()) != null) {
-                final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3) {
-                    continue;
-                }
-
-                final String hostname = splits[0];
-                final int port = Integer.parseInt(splits[1]);
-                final boolean secure = Boolean.parseBoolean(splits[2]);
-
-                statuses.add(new PeerStatus(hostname, port, secure, 1));
-            }
-        }
-
-        return statuses;
-    }
-
-    private static class PeerStatusCache {
-
-        private final Set<PeerStatus> statuses;
-        private final long timestamp;
-
-        public PeerStatusCache(final Set<PeerStatus> statuses) {
-            this(statuses, System.currentTimeMillis());
-        }
-
-        public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
-            this.statuses = statuses;
-            this.timestamp = timestamp;
-        }
-
-        public Set<PeerStatus> getStatuses() {
-            return statuses;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/.gitignore
index ea8c4bf..d9d66d8 100755
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/.gitignore
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/.gitignore
@@ -1 +1,2 @@
 /target
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.class
new file mode 100644
index 0000000..a6951d4
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.class
new file mode 100644
index 0000000..2e868ea
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.class
new file mode 100644
index 0000000..9c6e821
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceManager.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceManager.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceManager.class
new file mode 100644
index 0000000..bb3fc77
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteResourceManager.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteSiteListener.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteSiteListener.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteSiteListener.class
new file mode 100644
index 0000000..a56b5ba
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/RemoteSiteListener.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.class
new file mode 100644
index 0000000..9780f75
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort$EndpointConnectionState.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort$EndpointConnectionState.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort$EndpointConnectionState.class
new file mode 100644
index 0000000..f184d64
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort$EndpointConnectionState.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.class
new file mode 100644
index 0000000..0740bb4
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$FlowFileRequest.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$FlowFileRequest.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$FlowFileRequest.class
new file mode 100644
index 0000000..d517458
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$FlowFileRequest.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$ProcessingResult.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$ProcessingResult.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$ProcessingResult.class
new file mode 100644
index 0000000..1cf5ceb
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$ProcessingResult.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$StandardPortAuthorizationResult.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$StandardPortAuthorizationResult.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$StandardPortAuthorizationResult.class
new file mode 100644
index 0000000..3ad7542
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort$StandardPortAuthorizationResult.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.class
new file mode 100644
index 0000000..4db4735
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.class
new file mode 100644
index 0000000..e49ffe8
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.class
new file mode 100644
index 0000000..39dd49a
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.class
new file mode 100644
index 0000000..b415421
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.class
new file mode 100644
index 0000000..551097e
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.class
new file mode 100644
index 0000000..6913767
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.class
new file mode 100644
index 0000000..f6e9f20
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.class
new file mode 100644
index 0000000..41fe366
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.class
new file mode 100644
index 0000000..9b9cdc0
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.class
new file mode 100644
index 0000000..2ef1c39
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.class
new file mode 100644
index 0000000..fad8245
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/Response.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/Response.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/Response.class
new file mode 100644
index 0000000..27ec8d3
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/Response.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.class
new file mode 100644
index 0000000..4673aec
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.class
new file mode 100644
index 0000000..ac7e1b9
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.class
new file mode 100644
index 0000000..933ef4d
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.class
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.class b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.class
new file mode 100644
index 0000000..0e77276
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/bin/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.class differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
index f989f66..08bf590 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
@@ -60,6 +60,10 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+        	<groupId>org.apache.nifi</groupId>
+        	<artifactId>nifi-site-to-site-client</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
deleted file mode 100644
index 4babb92..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-
-public abstract class AbstractCommunicationsSession implements CommunicationsSession {
-    private String userDn;
-    
-    private volatile String uri;
-    
-    public AbstractCommunicationsSession(final String uri) {
-        this.uri = uri;
-    }
-    
-    @Override
-    public String toString() {
-        return uri;
-    }
-
-    @Override
-    public void setUri(final String uri) {
-        this.uri = uri;
-    }
-
-    @Override
-    public String getUri() {
-        return uri;
-    }
-
-    @Override
-    public String getUserDn() {
-        return userDn;
-    }
-    
-    @Override
-    public void setUserDn(final String dn) {
-        this.userDn = dn;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 49d3c3c..2b27de2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -26,48 +26,8 @@ import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 
-public class RemoteResourceFactory {
+public class RemoteResourceFactory extends RemoteResourceInitiator {
 
-	public static final int RESOURCE_OK = 20;
-	public static final int DIFFERENT_RESOURCE_VERSION = 21;
-	public static final int ABORT = 255;
-
-	
-	public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
-        // Write the classname of the RemoteStreamCodec, followed by its version
-    	dos.writeUTF(resource.getResourceName());
-    	final VersionNegotiator negotiator = resource.getVersionNegotiator();
-    	dos.writeInt(negotiator.getVersion());
-    	dos.flush();
-        
-        // wait for response from server.
-        final int statusCode = dis.read();
-        switch (statusCode) {
-            case RESOURCE_OK:	// server accepted our proposal of codec name/version
-                return resource;
-            case DIFFERENT_RESOURCE_VERSION:	// server accepted our proposal of codec name but not the version
-                // Get server's preferred version
-            	final int newVersion = dis.readInt();
-                
-                // Determine our new preferred version that is no greater than the server's preferred version.
-                final Integer newPreference = negotiator.getPreferredVersion(newVersion);
-                // If we could not agree with server on a version, fail now.
-                if ( newPreference == null ) {
-                    throw new HandshakeException("Could not agree on version for " + resource);
-                }
-                
-                negotiator.setVersion(newPreference);
-                
-                // Attempt negotiation of resource based on our new preferred version.
-                return initiateResourceNegotiation(resource, dis, dos);
-            case ABORT:
-            	throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
-            default:
-                return null;	// Unable to negotiate codec
-        }
-	}
-
-	
 	@SuppressWarnings("unchecked")
     public static <T extends FlowFileCodec> T receiveCodecNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
         final String codecName = dis.readUTF();
@@ -96,6 +56,14 @@ public class RemoteResourceFactory {
         }
 	}
 	
+	public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
+		dis.readUTF();	// read codec name
+		dis.readInt();	// read codec version
+		
+		dos.write(ABORT);
+		dos.writeUTF(explanation);
+		dos.flush();
+	}
 	
 	@SuppressWarnings("unchecked")
     public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index ec169ad..3295956 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -35,8 +35,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
 
-import org.apache.nifi.cluster.NodeInformant;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
@@ -122,6 +122,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                     }
                     LOG.trace("Got connection");
                     
+                    if ( stopped.get() ) {
+                        return;
+                    }
                     final Socket socket = acceptedSocket;
                     final SocketChannel socketChannel = socket.getChannel();
                     final Thread thread = new Thread(new Runnable() {
@@ -198,7 +201,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                             	protocol.setRootProcessGroup(rootGroup.get());
                           	    protocol.setNodeInformant(nodeInformant);
                             	
-                            	peer = new Peer(commsSession, peerUri);
+                            	peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort());
                             	LOG.debug("Handshaking....");
                             	protocol.handshake(peer);
                             	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index b0d88d4..a51cdba 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,90 +16,59 @@
  */
 package org.apache.nifi.remote;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
 
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.NodeInformation;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.client.socket.EndpointConnectionState;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.jersey.api.client.ClientHandlerException;
-
-public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroupPort {
+public class StandardRemoteGroupPort extends RemoteGroupPort {
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
     
     public static final int GZIP_COMPRESSION_LEVEL = 1;
-    public static final long PEER_REFRESH_PERIOD = 60000L;
     
     private static final String CATEGORY = "Site to Site";
     
     private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
     private final RemoteProcessGroup remoteGroup;
-    private final SSLContext sslContext;
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
     private final AtomicBoolean targetExists = new AtomicBoolean(true);
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile long peerRefreshTime = 0L;
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
+    private final TransferDirection transferDirection;
     
-    private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>();
-    private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
+    private final EndpointConnectionStatePool connectionStatePool;
     
     private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
     private final Lock interruptLock = new ReentrantLock();
@@ -113,8 +82,10 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
         super(id, name, processGroup, type, scheduler);
         
         this.remoteGroup = remoteGroup;
-        this.sslContext = sslContext;
+        this.transferDirection = direction;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+        
+        connectionStatePool = remoteGroup.getConnectionPool();
     }
     
     @Override
@@ -133,25 +104,10 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
     
     @Override
     public void shutdown() {
-        super.shutdown();
-        
-        peerTimeoutExpirations.clear();
+    	super.shutdown();
         interruptLock.lock();
         try {
             this.shutdown = true;
-            
-            for ( final CommunicationsSession commsSession : activeCommsChannels ) {
-                commsSession.interrupt();
-            }
-            
-            for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
-                EndpointConnectionState state;
-                while ( (state = queue.poll()) != null)  {
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                }
-            }
-            
-            endpointConnectionMap.clear();
         } finally {
             interruptLock.unlock();
         }
@@ -170,35 +126,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
     }
     
     
-    void cleanupSockets() {
-        final List<EndpointConnectionState> states = new ArrayList<>();
-        
-        for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) {
-            states.clear();
-            
-            EndpointConnectionState state;
-            while ((state = queue.poll()) != null) {
-                // If the socket has not been used in 10 seconds, shut it down.
-                final long lastUsed = state.getLastTimeUsed();
-                if ( lastUsed < System.currentTimeMillis() - 10000L ) {
-                    try {
-                        state.getSocketClientProtocol().shutdown(state.getPeer());
-                    } catch (final Exception e) {
-                        logger.debug("Failed to shut down {} using {} due to {}", 
-                            new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
-                    }
-                    
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                } else {
-                    states.add(state);
-                }
-            }
-            
-            queue.addAll(states);
-        }
-    }
-    
-    
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         if ( !remoteGroup.isTransmitting() ) {
@@ -212,137 +139,46 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
         }
         
         String url = getRemoteProcessGroup().getTargetUri().toString();
-        Peer peer = null;
-        final PeerStatus peerStatus = getNextPeerStatus();
-        if ( peerStatus == null ) {
-            logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
+        
+        final EndpointConnectionState connectionState;
+        try {
+        	connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+        } catch (final PortNotRunningException e) {
             context.yield();
+            this.targetRunning.set(false);
+            final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url);
+            logger.error(message);
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
-        }
-        
-        url = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
-        
-        //
-        // Attempt to get a connection state that already exists for this URL.
-        //
-        BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(url);
-        if ( connectionStateQueue == null ) {
-            connectionStateQueue = new LinkedBlockingQueue<>();
-            BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(url, connectionStateQueue);
-            if ( existingQueue != null ) {
-                connectionStateQueue = existingQueue;
+        } catch (final UnknownPortException e) {
+            context.yield();
+            this.targetExists.set(false);
+            final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url);
+            logger.error(message);
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
+            return;
+        } catch (final HandshakeException | IOException e) {
+            final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
+            logger.error(message);
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", e);
             }
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
+            session.rollback();
+            return;
         }
         
-        FlowFileCodec codec = null;
-        CommunicationsSession commsSession = null;
-        SocketClientProtocol protocol = null;
-        EndpointConnectionState connectionState;
+        if ( connectionState == null ) {
+            logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
+            context.yield();
+            return;
+        }
         
-        do {
-            connectionState = connectionStateQueue.poll();
-            logger.debug("{} Connection State for {} = {}", this, url, connectionState);
-            
-            // if we can't get an existing ConnectionState, create one
-            if ( connectionState == null ) {
-                protocol = new SocketClientProtocol();
-                protocol.setPort(this);
-    
-                try {
-                    commsSession = establishSiteToSiteConnection(peerStatus);
-                    final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
-                    final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-                    try {
-                        RemoteResourceFactory.initiateResourceNegotiation(protocol, dis, dos);
-                    } catch (final HandshakeException e) {
-                        try {
-                            commsSession.close();
-                        } catch (final IOException ioe) {
-                            final String message = String.format("%s unable to close communications session %s due to %s; resources may not be appropriately cleaned up",
-                                this, commsSession, ioe.toString());
-                            logger.error(message);
-                            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        }
-                    }
-                } catch (final IOException e) {
-                    final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? url : peer, e.toString());
-                    logger.error(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.error("", e);
-                    }
-                    remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                    session.rollback();
-                    return;
-                }
-                
-                
-                peer = new Peer(commsSession, url);
-                
-                // perform handshake
-                try {
-                    protocol.handshake(peer);
-                    
-                    // handle error cases
-                    if ( protocol.isDestinationFull() ) {
-                        logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
-                        penalize(peer);
-                        cleanup(protocol, peer);
-                        return;
-                    } else if ( protocol.isPortInvalid() ) {
-                        penalize(peer);
-                        context.yield();
-                        cleanup(protocol, peer);
-                        this.targetRunning.set(false);
-                        final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, peer);
-                        logger.error(message);
-                        remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        return;
-                    } else if ( protocol.isPortUnknown() ) {
-                        penalize(peer);
-                        context.yield();
-                        cleanup(protocol, peer);
-                        this.targetExists.set(false);
-                        final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, peer);
-                        logger.error(message);
-                        remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        return;
-                    }
-                    
-                    // negotiate the FlowFileCodec to use
-                    codec = protocol.negotiateCodec(peer);
-                } catch (final Exception e) {
-                    penalize(peer);
-                    cleanup(protocol, peer);
-                    
-                    final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? url : peer, e.toString());
-                    logger.error(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.error("", e);
-                    }
-                    remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                    session.rollback();
-                    return;                    
-                }
-                
-                connectionState = new EndpointConnectionState(peer, protocol, codec);
-            } else {
-                final long lastTimeUsed = connectionState.getLastTimeUsed();
-                final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
-                final long timeoutMillis = remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
-                
-                if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) {
-                    cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
-                    connectionState = null;
-                } else {
-                    codec = connectionState.getCodec();
-                    peer = connectionState.getPeer();
-                    commsSession = peer.getCommunicationsSession();
-                    protocol = connectionState.getSocketClientProtocol();
-                }
-            }
-        } while ( connectionState == null || codec == null || commsSession == null || protocol == null );
+        FlowFileCodec codec = connectionState.getCodec();
+        SocketClientProtocol protocol = connectionState.getSocketClientProtocol();
+        final Peer peer = connectionState.getPeer();
+        url = peer.getUrl();
         
-            
         try {
             interruptLock.lock();
             try {
@@ -361,11 +197,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
                 receiveFlowFiles(peer, protocol, context, session, codec);
             }
 
-            if ( peer.isPenalized() ) {
-                logger.debug("{} {} was penalized", this, peer);
-                penalize(peer);
-            }
-            
             interruptLock.lock();
             try {
                 if ( shutdown ) {
@@ -380,12 +211,12 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
             session.commit();
             
             connectionState.setLastTimeUsed();
-            connectionStateQueue.add(connectionState);
+            connectionStatePool.offer(connectionState);
         } catch (final TransmissionDisabledException e) {
             cleanup(protocol, peer);
             session.rollback();
         } catch (final Exception e) {
-            penalize(peer);
+            connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
 
             final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
             logger.error(message);
@@ -401,34 +232,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
     }
 
     
-    /**
-     * Updates internal state map to penalize a PeerStatus that points to the specified peer
-     * @param peer
-     */
-    private void penalize(final Peer peer) {
-        String host;
-        int port;
-        try {
-            final URI uri = new URI(peer.getUrl());
-            host = uri.getHost();
-            port = uri.getPort();
-        } catch (final URISyntaxException e) {
-            host = peer.getHost();
-            port = -1;
-        }
-        
-        final PeerStatus status = new PeerStatus(host, port, true, 1);
-        Long expiration = peerTimeoutExpirations.get(status);
-        if ( expiration == null ) {
-            expiration = Long.valueOf(0L);
-        }
-        
-        final long penalizationMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
-        final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
-    }
-    
-    
     private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
         if ( protocol != null && peer != null ) {
             try {
@@ -457,108 +260,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
         return remoteGroup.getYieldDuration();
     }
     
-    public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
-        final String destinationUri = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
-
-        CommunicationsSession commsSession = null;
-        try {
-        if ( peerStatus.isSecure() ) {
-            if ( sslContext == null ) {
-                throw new IOException("Unable to communicate with " + peerStatus.getHostname() + ":" + peerStatus.getPort() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
-            }
-            
-            final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, peerStatus.getHostname(), peerStatus.getPort(), true);
-                socketChannel.connect();
-    
-            commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-        } else {
-            final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(peerStatus.getHostname(), peerStatus.getPort()));
-            commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
-        }
-
-        commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-        commsSession.setUri(destinationUri);
-        } catch (final IOException ioe) {
-            if ( commsSession != null ) {
-                commsSession.close();
-            }
-            
-            throw ioe;
-        }
-        
-        return commsSession;
-    }
-    
-    private PeerStatus getNextPeerStatus() {
-        List<PeerStatus> peerList = peerStatuses;
-        if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) {
-            try {
-                try {
-                    peerList = createPeerStatusList();
-                } catch (final IOException | BadRequestException | HandshakeException | UnknownPortException | PortNotRunningException | ClientHandlerException e) {
-                    final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                    logger.warn(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", e);
-                    }
-                    remoteGroup.getEventReporter().reportEvent(Severity.WARNING, CATEGORY, message);
-                }
-                
-                this.peerStatuses = peerList;
-                peerRefreshTime = System.currentTimeMillis();
-            } finally {
-                peerRefreshLock.unlock();
-            }
-        }
-
-        if ( peerList == null || peerList.isEmpty() ) {
-            return null;
-        }
-
-        PeerStatus peerStatus;
-        for (int i=0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
-            
-            if ( isPenalized(peerStatus) ) {
-                logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
-            } else {
-                return peerStatus;
-            }
-        }
-        
-        logger.debug("{} All peers appear to be penalized; returning null", this);
-        return null;
-    }
-    
-    private boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
-        return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
-    }
-    
-    private List<PeerStatus> createPeerStatusList() throws IOException, BadRequestException, HandshakeException, UnknownPortException, PortNotRunningException {
-        final Set<PeerStatus> statuses = remoteGroup.getPeerStatuses();
-        if ( statuses == null ) {
-            return new ArrayList<>();
-        }
-        
-        final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
-        final List<NodeInformation> nodeInfos = new ArrayList<>();
-        for ( final PeerStatus peerStatus : statuses ) {
-            final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
-            nodeInfos.add(nodeInfo);
-        }
-        clusterNodeInfo.setNodeInformation(nodeInfos);
-        return formulateDestinationList(clusterNodeInfo);
-    }
     
     private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
         protocol.transferFlowFiles(peer, context, session, codec);
@@ -568,70 +269,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
         protocol.receiveFlowFiles(peer, context, session, codec);
     }
 
-    private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException {
-        return formulateDestinationList(clusterNodeInfo, getConnectableType());
-    }
-    
-    static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final ConnectableType connectableType) {
-        final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation();
-        final int numDestinations = Math.max(128, nodeInfoSet.size());
-        final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
-
-        long totalFlowFileCount = 0L;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            totalFlowFileCount += nodeInfo.getTotalFlowFiles();
-        }
-
-        int totalEntries = 0;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            final int flowFileCount = nodeInfo.getTotalFlowFiles();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (connectableType == ConnectableType.REMOTE_INPUT_PORT) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-            
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
-        }
-        
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i=0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
-            final NodeInformation nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-            
-            int skipIndex = numEntries;
-            for (int i=0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if ( status == null ) {
-                        status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
-            }
-        }
-
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
-            final double percentage = entry.getValue() * 100D / (double) destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
-        }
-        logger.info(distributionDescription.toString());
-
-        // Jumble the list of destinations.
-        return destinations;
-    }
-    
-    
     @Override
     public boolean getTargetExists() {
         return targetExists.get();
@@ -717,40 +354,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup
     }
     
     
-    private static class EndpointConnectionState {
-        private final Peer peer;
-        private final SocketClientProtocol socketClientProtocol;
-        private final FlowFileCodec codec;
-        private volatile long lastUsed;
-        
-        private EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
-            this.peer = peer;
-            this.socketClientProtocol = socketClientProtocol;
-            this.codec = codec;
-        }
-        
-        public FlowFileCodec getCodec() {
-            return codec;
-        }
-        
-        public SocketClientProtocol getSocketClientProtocol() {
-            return socketClientProtocol;
-        }
-        
-        public Peer getPeer() {
-            return peer;
-        }
-        
-        public void setLastTimeUsed() {
-            lastUsed = System.currentTimeMillis();
-        }
-        
-        public long getLastTimeUsed() {
-            return lastUsed;
-        }
-    }
-
-    
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;