You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/05 20:41:25 UTC

[nifi] branch master updated: NIFI-7467 Refactored S2S peer selection logic. Removed list structure for peer selection as it was unnecessary and often wasteful (most clusters are 3 - 7 nodes, the list was always 128 elements). Changed integer percentages to double to allow for better normalization. Removed 80% cap on remote peers as it was due to legacy requirements. Added unit tests for non-deterministic distribution calculations. Added unit tests for edge cases due to rounding errors, single valid remotes, unbalanced [...]

This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 845b66a  NIFI-7467 Refactored S2S peer selection logic. Removed list structure for peer selection as it was unnecessary and often wasteful (most clusters are 3 - 7 nodes, the list was always 128 elements). Changed integer percentages to double to allow for better normalization. Removed 80% cap on remote peers as it was due to legacy requirements. Added unit tests for non-deterministic distribution calculations. Added unit tests for edge cases due to rounding errors, single valid  [...]
845b66a is described below

commit 845b66ab9204cc4e8f2418ee6fd154191c3c3941
Author: Andy LoPresto <al...@apache.org>
AuthorDate: Mon May 18 22:33:05 2020 -0700

    NIFI-7467 Refactored S2S peer selection logic.
    Removed list structure for peer selection as it was unnecessary and often wasteful (most clusters are 3 - 7 nodes, the list was always 128 elements).
    Changed integer percentages to double to allow for better normalization.
    Removed 80% cap on remote peers as it was due to legacy requirements.
    Added unit tests for non-deterministic distribution calculations.
    Added unit tests for edge cases due to rounding errors, single valid remotes, unbalanced clusters, and peer queue consecutive selection tracking.
    Migrated all legacy PeerSelector unit tests to new API.
    Removed unused System time manipulation as tests no longer need it.
    Added class-level Javadoc to PeerSelector.
    Removed S2S details request replication, as the responses were not being merged, which led to incorrect ports being returned and breaking S2S peer retrieval.
    Fixed copy/paste error where input ports were being listed as output ports during remote flow refresh.
    Fixed comments and added unbalanced cluster test scenarios.
    Removed unnecessary marker interface.
    Removed commented code.
    Changed weighting & penalization behavior.
    Changed dependency scope to test.
    
    This closes #4289.
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 nifi-commons/nifi-site-to-site-client/pom.xml      |    9 +
 .../org/apache/nifi/remote/PeerDescription.java    |    8 +
 .../java/org/apache/nifi/remote/PeerStatus.java    |   20 +
 .../apache/nifi/remote/client/PeerSelector.java    |  659 ++++++++----
 .../nifi/remote/client/PeerStatusProvider.java     |    6 +-
 .../apache/nifi/remote/client/http/HttpClient.java |   55 +-
 .../client/socket/EndpointConnectionPool.java      |   67 +-
 .../remote/protocol/CommunicationsSession.java     |    3 +-
 .../apache/nifi/remote/util/PeerStatusCache.java   |   18 +-
 .../nifi/remote/util/SiteToSiteRestApiClient.java  |  109 +-
 .../nifi/remote/client/PeerSelectorTest.groovy     | 1133 ++++++++++++++++++++
 .../nifi/remote/client/TestPeerSelector.java       |  383 -------
 .../src/test/resources/logback-test.xml            |   11 +-
 .../nifi/remote/StandardRemoteProcessGroup.java    |   65 +-
 .../src/test/resources/logback-test.xml            |    2 +-
 .../apache/nifi/web/api/SiteToSiteResource.java    |   42 +-
 16 files changed, 1786 insertions(+), 804 deletions(-)

diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml
index dc77921..b2b597f 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -77,6 +77,15 @@
             <version>4.1.4</version>
         </dependency>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.esotericsoftware.kryo</groupId>
             <artifactId>kryo</artifactId>
             <version>2.24.0</version>
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
index f34c31d..3b20502 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.remote;
 
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+
 public class PeerDescription {
 
     private final String hostname;
@@ -28,6 +30,12 @@ public class PeerDescription {
         this.secure = secure;
     }
 
+    public PeerDescription(final PeerDTO peerDTO) {
+        this.hostname = peerDTO.getHostname();
+        this.port = peerDTO.getPort();
+        this.secure = peerDTO.isSecure();
+    }
+
     public String getHostname() {
         return hostname;
     }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
index 3113076..1c2f1e5 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -16,6 +16,15 @@
  */
 package org.apache.nifi.remote;
 
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+
+/**
+ * This class represents the state of a specific peer, both its identifying information (contained
+ * in the {@link PeerDescription}: hostname, port, and security) and its current status (number of
+ * flowfiles and whether it can query other peers for status). Equality is only based on the
+ * identifying information, so when iterating over multiple PeerStatus objects, more recent statuses
+ * will replace previously acquired statuses for a specific peer.
+ */
 public class PeerStatus {
 
     private final PeerDescription description;
@@ -28,6 +37,17 @@ public class PeerStatus {
         this.queryForPeers = queryForPeers;
     }
 
+    /**
+     * Copy constructor from a {@link PeerDTO}. {@link #isQueryForPeers()} is hard-coded to {@code true}.
+     *
+     * @param peerDTO the peer DTO object with hostname, port, security, and flowfile count
+     */
+    public PeerStatus(final PeerDTO peerDTO) {
+        this.description = new PeerDescription(peerDTO);
+        this.numFlowFiles = peerDTO.getFlowFileCount();
+        this.queryForPeers = true;
+    }
+
     public PeerDescription getPeerDescription() {
         return description;
     }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 0a61077..c5af1cf 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -16,353 +16,552 @@
  */
 package org.apache.nifi.remote.client;
 
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import javax.validation.constraints.NotNull;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
+/**
+ * Service which maintains state around peer (NiFi node(s) in a remote instance (cluster or
+ * standalone)). There is an internal cache which stores identifying information about each
+ * node and the current workload of each in number of flowfiles being processed. Individual
+ * nodes can be penalized for an amount of time (see {@link #penalize(Peer, long)}) to avoid
+ * sending/receiving data from them. Attempts are made to balance communications ("busier"
+ * nodes will {@code TransferDirection.SEND} more and {@code TransferDirection.RECEIVE} fewer
+ * flowfiles from this instance).
+ */
 public class PeerSelector {
-
     private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
-    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private static final long PEER_REFRESH_PERIOD = 60000L;
+    // The timeout for the peer status cache
+    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
-    private volatile long peerRefreshTime = 0L;
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    private volatile PeerStatusCache peerStatusCache;
+    // The service which saves the peer state to persistent storage
     private final PeerPersistence peerPersistence;
 
-    private EventReporter eventReporter;
-
+    // The service which retrieves peer state
     private final PeerStatusProvider peerStatusProvider;
-    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
 
-    static class SystemTime {
-        long currentTimeMillis() {
-            return System.currentTimeMillis();
-        }
-    }
-    private SystemTime systemTime = new SystemTime();
+    // Maps the peer description to a millisecond penalty expiration
+    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<>();
+
+    // The most recently fetched peer statuses
+    private volatile PeerStatusCache peerStatusCache;
+
+    private EventReporter eventReporter;
 
     /**
-     * Replace the SystemTime instance.
-     * This method is purely used by unit testing, to emulate peer refresh period.
+     * Returns a peer selector with the provided collaborators.
+     *
+     * @param peerStatusProvider the service which retrieves peer state
+     * @param peerPersistence    the service which persists peer state
      */
-    void setSystemTime(final SystemTime systemTime) {
-        logger.info("Replacing systemTime instance to {}.", systemTime);
-        this.systemTime = systemTime;
-    }
-
     public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
         this.peerStatusProvider = peerStatusProvider;
         this.peerPersistence = peerPersistence;
 
+        // On instantiation, retrieve the peer status cache
+        restoreInitialPeerStatusCache();
+    }
+
+    /**
+     * Populates the peer status cache from the peer persistence provider (e.g. the file system or
+     * persisted cluster state). If this fails, it will log a warning and continue, as it is not
+     * required for startup. If the cached protocol differs from the currently configured protocol,
+     * the cache will be cleared.
+     */
+    private void restoreInitialPeerStatusCache() {
         try {
             PeerStatusCache restoredPeerStatusCache = null;
             if (peerPersistence != null) {
                 restoredPeerStatusCache = peerPersistence.restore();
+
+                // If there is an existing cache, ensure that the protocol matches the current protocol
                 if (restoredPeerStatusCache != null) {
                     final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
                     final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+
+                    // If the protocols have changed, clear the cache
                     if (!currentProtocol.equals(cachedProtocol)) {
-                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
-                            peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+                        logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+                                peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
                         restoredPeerStatusCache = null;
                     }
                 }
             }
             this.peerStatusCache = restoredPeerStatusCache;
-
         } catch (final IOException ioe) {
             logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
-                peerPersistence.getClass().getSimpleName(), ioe);
+                    peerPersistence.getClass().getSimpleName(), ioe);
         }
     }
 
-    private void persistPeerStatuses() {
-        try {
-            peerPersistence.save(peerStatusCache);
-        } catch (final IOException e) {
-            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
-                " and the nodes specified at the RPG are down," +
-                " may be unable to transfer data until communications with those nodes are restored", e.toString());
-            logger.error("", e);
+    /**
+     * Returns the normalized weight for this ratio of peer flowfiles to total flowfiles and the given direction. The number will be
+     * a Double between 0 and 100 indicating the percent of all flowfiles the peer
+     * should send/receive. The transfer direction is <em>from the perspective of this node to the peer</em>
+     * (i.e. how many flowfiles should <em>this node send</em> to the peer, or how many flowfiles
+     * should <em>this node receive</em> from the peer).
+     *
+     * @param direction          the transfer direction ({@code SEND} weights the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights them higher if they have more)
+     * @param totalFlowFileCount the total flowfile count in the remote instance (standalone or cluster)
+     * @param flowFileCount      the flowfile count for the given peer
+     * @param peerCount          the number of peers in the remote instance
+     * @return the normalized weight of this peer
+     */
+    private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
+        // If there is only a single remote, send/receive all data to/from it
+        if (peerCount == 1) {
+            return 100;
         }
-    }
-
-    List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
-
-        final int numDestinations = Math.max(128, statuses.size());
-        final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
 
-        long totalFlowFileCount = 0L;
-        for (final PeerStatus nodeInfo : statuses) {
-            totalFlowFileCount += nodeInfo.getFlowFileCount();
+        double cappedPercent;
+        // If no flowfiles exist in the remote instance, evenly weight each node with 1/N
+        if (totalFlowFileCount == 0) {
+            cappedPercent = 1.0 / peerCount;
+        } else {
+            final double percentageOfFlowFiles = ((double) flowFileCount / totalFlowFileCount);
+            cappedPercent = percentageOfFlowFiles;
+
+            // If sending to the remote, allocate more flowfiles to the less-stressed peers
+            if (direction == TransferDirection.SEND) {
+                cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
+            }
         }
+        return new BigDecimal(cappedPercent * 100).setScale(2, RoundingMode.FLOOR).doubleValue();
+    }
 
-        int totalEntries = 0;
-        for (final PeerStatus nodeInfo : statuses) {
-            final int flowFileCount = nodeInfo.getFlowFileCount();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
+    /**
+     * Returns an ordered map of peers sorted in descending order by value (relative weight).
+     *
+     * @param unsortedMap the unordered map of peers to weights
+     * @return the sorted (desc) map (by value)
+     */
+    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
+        List<Map.Entry<PeerStatus, Double>> list = new ArrayList<>(unsortedMap.entrySet());
+        list.sort(Map.Entry.comparingByValue());
+
+        LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            Map.Entry<PeerStatus, Double> entry = list.get(i);
+            result.put(entry.getKey(), entry.getValue());
         }
 
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final PeerStatus nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-
-            int skipIndex = numEntries;
-            for (int i = 0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if (status == null) {
-                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
+        return result;
+    }
+
+    /**
+     * Prints the distribution of the peers to the logger.
+     *
+     * @param sortedPeerWorkloads the peers and relative weights
+     */
+    private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
+        if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
+            DecimalFormat df = new DecimalFormat("##.##");
+            df.setRoundingMode(RoundingMode.FLOOR);
+            final StringBuilder distributionDescription = new StringBuilder();
+            distributionDescription.append("New weighted distribution of nodes:");
+            for (final Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
+                final double percentage = entry.getValue();
+                distributionDescription.append("\n").append(entry.getKey())
+                        .append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ")
+                        .append(df.format(percentage)).append("% of data");
             }
+            logger.debug(distributionDescription.toString());
         }
+    }
 
-        // Shuffle destinations to provide better distribution.
-        // Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
-        // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
-        Collections.shuffle(destinations, new Random(0));
+    /**
+     * Returns the total of all values in the map. This method is frequently used to calculate the total number of
+     * flowfiles in the instance from the respective peer flowfile counts or the total percentage from the relative weights.
+     *
+     * @param peerWeightMap the map of peers to flowfile counts or relative weights
+     * @return the total of the map values
+     */
+    private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
+        return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+    }
 
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
-        }
-        logger.info(distributionDescription.toString());
+    /**
+     * Resets all penalization states for the peers.
+     */
+    public void clear() {
+        peerPenaltyExpirations.clear();
+    }
 
-        // Jumble the list of destinations.
-        return destinations;
+    /**
+     * Return status of a peer that will be used for the next communication.
+     * The peers with lower workloads will be selected with higher probability.
+     *
+     * @param direction the amount of workload is calculated based on transaction direction,
+     *                  for SEND, a peer with fewer flow files is preferred,
+     *                  for RECEIVE, a peer with more flow files is preferred
+     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     */
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+        Set<PeerStatus> peerStatuses = getPeerStatuses();
+        Map<PeerStatus, Double> orderedPeerStatuses = buildWeightedPeerMap(peerStatuses, direction);
+
+        return getAvailablePeerStatus(orderedPeerStatuses);
+    }
+
+    /**
+     * Returns {@code true} if this peer is currently penalized and should not send/receive flowfiles.
+     *
+     * @param peerStatus the peer status identifying the peer
+     * @return true if this peer is penalized
+     */
+    public boolean isPenalized(final PeerStatus peerStatus) {
+        final Long expirationEnd = peerPenaltyExpirations.get(peerStatus.getPeerDescription());
+        return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
     }
 
     /**
      * Updates internal state map to penalize a PeerStatus that points to the
-     * specified peer
+     * specified peer.
      *
-     * @param peer the peer
-     * @param penalizationMillis period of time to penalize a given peer
+     * @param peer               the peer
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
         penalize(peer.getDescription(), penalizationMillis);
     }
 
+    /**
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer.
+     *
+     * @param peerDescription    the peer description (identifies the peer)
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
+     */
     public void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(peerDescription);
+        Long expiration = peerPenaltyExpirations.get(peerDescription);
         if (expiration == null) {
-            expiration = Long.valueOf(0L);
+            expiration = 0L;
         }
 
-        final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
+        final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+        peerPenaltyExpirations.put(peerDescription, newExpiration);
     }
 
-    public boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
+    /**
+     * Allows for external callers to trigger a refresh of the internal peer status cache. Performs the refresh if the cache has expired. If the cache is still valid, skips the refresh.
+     */
+    public void refresh() {
+        long cacheAgeMs = getCacheAge();
+        logger.debug("External refresh triggered. Last refresh was {} ms ago", cacheAgeMs);
+        if (isPeerRefreshNeeded()) {
+            logger.debug("Refreshing peer status cache");
+            refreshPeerStatusCache();
+        } else {
+            logger.debug("Cache is still valid; skipping refresh");
+        }
     }
 
-    public void clear() {
-        peerTimeoutExpirations.clear();
+    /**
+     * Sets the event reporter instance.
+     *
+     * @param eventReporter the event reporter
+     */
+    public void setEventReporter(EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
     }
 
-    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    /**
+     * Returns a map of peers prepared for flowfile transfer in the specified direction. Each peer is a key and the value is a
+     * weighted percentage of the total flowfiles in the remote instance. For example, in a cluster where the total number of flowfiles
+     * is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C, the resulting map for
+     * {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 * 100 / (3-1)) => 40.0).
+     *
+     * @param statuses  the set of all peers
+     * @param direction the direction of transfer ({@code SEND} weights the destinations higher if they have more flowfiles, {@code RECEIVE} weights them higher if they have fewer)
+     * @return the ordered map of each peer to its relative weight
+     */
+    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final Set<PeerStatus> statuses, final TransferDirection direction) {
+        // Get all the destinations with their relative weights
+        final Map<PeerStatus, Double> peerWorkloads = createDestinationMap(statuses, direction);
+
+        if (!peerWorkloads.isEmpty()) {
+            // This map is sorted, but not by key, so it cannot use SortedMap
+            LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = sortMapByWeight(peerWorkloads);
+
+            // Print the expected distribution of the peers
+            printDistributionStatistics(sortedPeerWorkloads, direction);
+
+            return sortedPeerWorkloads;
+        } else {
+            logger.debug("No peers available");
+            return new LinkedHashMap<>();
+        }
     }
 
     /**
-     * Return status of a peer that will be used for the next communication.
-     * The peer with less workload will be selected with higher probability.
-     * @param direction the amount of workload is calculated based on transaction direction,
-     *                  for SEND, a peer with less flow files is preferred,
-     *                  for RECEIVE, a peer with more flow files is preferred
-     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     * Returns a map indexed by a peer to the normalized weight (number of flowfiles currently being
+     * processed by the peer as a percentage of the total). This is used to allocate flowfiles to
+     * the various peers as destinations.
+     *
+     * @param peerStatuses the set of peers, along with their current workload (number of flowfiles)
+     * @param direction    whether sending flowfiles to these peers or receiving them
+     * @return the map of weighted peers
      */
-    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if (isPeerRefreshNeeded(peerList)) {
-            peerRefreshLock.lock();
+    @NotNull
+    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
+        final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
+
+        // Calculate the total number of flowfiles in the peers
+        long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
+        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", totalFlowFileCount);
+
+        // For each node, calculate the relative weight and store it in the map
+        for (final PeerStatus nodeInfo : peerStatuses) {
+            final int flowFileCount = nodeInfo.getFlowFileCount();
+            final double normalizedWeight = calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
+            peerWorkloads.put(nodeInfo, normalizedWeight);
+        }
+
+        return peerWorkloads;
+    }
+
+    /**
+     * Returns a set of {@link PeerStatus} objects representing all remote peers for the provided
+     * {@link PeerDescription}s. If a queried peer returns updated state on a peer which has already
+     * been captured, the new state is used.
+     * <p>
+     * Example:
+     * <p>
+     * 3 node cluster with nodes A, B, C
+     * <p>
+     * Node A knows about Node B and Node C, B about A and C, etc.
+     *
+     * <pre>
+     *     Action                           |   Statuses
+     *     query(A) -> B.status, C.status   |   Bs1, Cs1
+     *     query(B) -> A.status, C.status   |   As1, Bs1, Cs2
+     *     query(C) -> A.status, B.status   |   As2, Bs2, Cs2
+     * </pre>
+     *
+     * @param peersToRequestClusterInfoFrom the set of peers to query
+     * @return the complete set of statuses for each collection of peers
+     * @throws IOException if there is a problem fetching peer statuses
+     */
+    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
+        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
+        Exception lastFailure = null;
+
+        final Set<PeerStatus> allPeerStatuses = new HashSet<>();
+
+        // Iterate through all peers, getting (sometimes multiple) status(es) from each
+        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
             try {
-                // now that we have the lock, check again that we need to refresh (because another thread
-                // could have been refreshing while we were waiting for the lock).
-                peerList = peerStatuses;
-                if (isPeerRefreshNeeded(peerList)) {
-                    try {
-                        peerList = createPeerStatusList(direction);
-                    } catch (final Exception e) {
-                        final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                        warn(logger, eventReporter, message);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-                    }
+                // Retrieve the peer status(es) from each peer description
+                final Set<PeerStatus> statusesForPeerDescription = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
 
-                    this.peerStatuses = peerList;
-                    peerRefreshTime = systemTime.currentTimeMillis();
-                }
-            } finally {
-                peerRefreshLock.unlock();
+                // Filter to remove any peers which are not queryable
+                final Set<PeerStatus> filteredStatuses = statusesForPeerDescription.stream()
+                        .filter(PeerStatus::isQueryForPeers)
+                        .collect(Collectors.toSet());
+
+                allPeerStatuses.addAll(filteredStatuses);
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}",
+                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
+                lastFailure = e;
             }
         }
 
-        if (peerList == null || peerList.isEmpty()) {
+        // If no peers were fetched and an exception was the cause, throw an exception
+        if (allPeerStatuses.isEmpty() && lastFailure != null) {
+            throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
+        }
+
+        return allPeerStatuses;
+    }
+
+    /**
+     * Returns the {@link PeerStatus} identifying the next peer to send/receive data. This uses random
+     * selection of peers, weighted by the relative desirability (i.e. for {@code SEND}, peers with more
+     * flowfiles are more likely to be selected, and for {@code RECEIVE}, peers with fewer flowfiles are
+     * more likely).
+     *
+     * @param orderedPeerStatuses the map of peers to relative weights, sorted in descending order by weight
+     * @return the peer to send/receive data
+     */
+    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
+        if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
+            logger.warn("Available peers collection is empty; no peer available");
             return null;
         }
 
-        PeerStatus peerStatus;
-        for (int i = 0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
+        // Only distribute to unpenalized peers
+        Map<PeerStatus, Double> unpenalizedPeers = orderedPeerStatuses.entrySet().stream()
+                .filter(e -> !isPenalized(e.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
-            if (isPenalized(peerStatus)) {
-                logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
-            } else {
-                return peerStatus;
+        final double totalWeights = sumMapValues(unpenalizedPeers);
+        logger.debug("Determining next available peer ({} peers with total weight {})", unpenalizedPeers.keySet().size(), totalWeights);
+
+        final double random = Math.random() * Math.min(100, totalWeights);
+        logger.debug("Generated random value {}", random);
+
+        double threshold = 0.0;
+        for (Map.Entry<PeerStatus, Double> e : unpenalizedPeers.entrySet()) {
+            logger.debug("Initial threshold was {}; added peer value {}; total {}", threshold, e.getValue(), threshold + e.getValue());
+            threshold += e.getValue();
+            if (random <= threshold) {
+                return e.getKey();
             }
         }
 
-        logger.debug("{} All peers appear to be penalized; returning null", this);
+        logger.debug("Did not select a peer; r {}, t {}, w {}", random, threshold, orderedPeerStatuses.values());
+        logger.debug("All peers appear to be penalized; returning null");
         return null;
     }
 
-    private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
-        Set<PeerStatus> statuses = getPeerStatuses();
-        if (statuses == null) {
-            refreshPeers();
-            statuses = getPeerStatuses();
-            if (statuses == null) {
-                logger.debug("{} found no peers to connect to", this);
-                return Collections.emptyList();
-            }
+    /**
+     * Returns the cache age in milliseconds. If the cache is null or not set, returns {@code -1}.
+     *
+     * @return the cache age in millis
+     */
+    private long getCacheAge() {
+        if (peerStatusCache == null) {
+            return -1;
         }
-        return formulateDestinationList(statuses, direction);
+        return System.currentTimeMillis() - peerStatusCache.getTimestamp();
     }
 
-    private Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < systemTime.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
-                equalizedSet.add(equalizedStatus);
-            }
+    /**
+     * Returns the set of queryable peers ({@link PeerStatus#isQueryForPeers()}) most recently fetched.
+     *
+     * @return the set of queryable peers (empty set if the cache is {@code null})
+     */
+    @NotNull
+    private Set<PeerStatus> getLastFetchedQueryablePeers() {
+        return peerStatusCache != null ? peerStatusCache.getStatuses() : Collections.emptySet();
+    }
 
-            return equalizedSet;
+    /**
+     * Returns the set of peer statuses. If the cache is {@code null} or empty, refreshes the cache first and then returns the new peer status set.
+     *
+     * @return the most recent peer statuses (empty set if the cache is {@code null})
+     */
+    @NotNull
+    private Set<PeerStatus> getPeerStatuses() {
+        if (isPeerRefreshNeeded()) {
+            refreshPeerStatusCache();
         }
 
-        return cache.getStatuses();
+        return getLastFetchedQueryablePeers();
     }
 
-    public void refreshPeers() {
-        final PeerStatusCache existingCache = peerStatusCache;
-        if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
-            return;
-        }
+    /**
+     * Returns the set of {@link PeerDescription} objects uniquely identifying each NiFi node which should be queried for {@link PeerStatus}.
+     *
+     * @return the set of recently retrieved peers and the bootstrap peer
+     * @throws IOException if there is a problem retrieving the list of peers to query
+     */
+    private Set<PeerDescription> getPeersToQuery() throws IOException {
+        final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
 
-        try {
-            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
-            peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
-            persistPeerStatuses();
-            logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
-        } catch (Exception e) {
-            warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
-            if (logger.isDebugEnabled()) {
-                logger.debug("", e);
+        // Use the peers fetched last time
+        final Set<PeerStatus> lastFetched = getLastFetchedQueryablePeers();
+        if (lastFetched != null && !lastFetched.isEmpty()) {
+            for (PeerStatus peerStatus : lastFetched) {
+                peersToRequestClusterInfoFrom.add(peerStatus.getPeerDescription());
             }
         }
+
+        // Always add the configured node info to the list of peers
+        peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+
+        return peersToRequestClusterInfoFrom;
     }
 
-    public void setEventReporter(EventReporter eventReporter) {
-        this.eventReporter = eventReporter;
+    /**
+     * Returns {@code true} if this cache has expired.
+     *
+     * @param cache the peer status cache
+     * @return true if the cache is expired
+     */
+    private boolean isCacheExpired(PeerStatusCache cache) {
+        return cache == null || cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis();
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
-        final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
+    /**
+     * Returns {@code true} if the internal collection of peers is empty or the refresh time has passed.
+     *
+     * @return true if the peer statuses should be refreshed
+     */
+    private boolean isPeerRefreshNeeded() {
+        return (peerStatusCache == null || peerStatusCache.isEmpty() || isCacheExpired(peerStatusCache));
+    }
 
-        // Look at all of the peers that we fetched last time.
-        final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
-        if (lastFetched != null && !lastFetched.isEmpty()) {
-            lastFetched.stream().map(peer -> peer.getPeerDescription())
-                    .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
-        }
+    /**
+     * Persists the provided cache instance (in memory and via the {@link PeerPersistence} (e.g. in cluster state or a local file)) for future retrieval.
+     *
+     * @param peerStatusCache the cache of current peer statuses to persist
+     */
+    private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
+        try {
+            this.peerStatusCache = peerStatusCache;
 
-        // Always add the configured node info to the list of peers to communicate with
-        peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+            // The #save mechanism persists the cache to stateful or file-based storage
+            peerPersistence.save(peerStatusCache);
+        } catch (final IOException e) {
+            error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" +
+                    " and the nodes specified at the remote instance are down," +
+                    " may be unable to transfer data until communications with those nodes are restored", e.toString());
+            logger.error("", e);
+        }
+    }
 
-        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
-        Exception lastFailure = null;
-        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
-            try {
-                final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
-                lastFetchedQueryablePeers = statuses.stream()
-                        .filter(p -> p.isQueryForPeers())
-                        .collect(Collectors.toSet());
+    /**
+     * Refreshes the list of S2S peers that flowfiles can be sent to or received from. Uses the stateful
+     * cache to reduce network overhead.
+     */
+    private void refreshPeerStatusCache() {
+        try {
+            // Splitting enumeration and querying into separate methods allows better testing and composition
+            final Set<PeerDescription> peersToQuery = getPeersToQuery();
+            final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peersToQuery);
 
-                return statuses;
-            } catch (final Exception e) {
-                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}",
-                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
-                lastFailure = e;
+            if (statuses.isEmpty()) {
+                logger.info("No peers were retrieved from the remote group {}", peersToQuery.stream().map(p -> p.getHostname() + ":" + p.getPort()).collect(Collectors.joining(",")));
             }
-        }
 
-        final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
-        if (lastFailure != null) {
-            ioe.addSuppressed(lastFailure);
+            // Persist the fetched peer statuses
+            PeerStatusCache peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+            persistPeerStatuses(peerStatusCache);
+            logger.info("Successfully refreshed peer status cache; remote group consists of {} peers", statuses.size());
+        } catch (Exception e) {
+            warn(logger, eventReporter, "Unable to refresh remote group peers due to: {}", e.getMessage());
+            if (logger.isDebugEnabled() && e.getCause() != null) {
+                logger.warn("Caused by: ", e);
+            }
         }
-
-        throw ioe;
     }
-
-}
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
index 64fd161..7f8b119 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -16,13 +16,12 @@
  */
 package org.apache.nifi.remote.client;
 
+import java.io.IOException;
+import java.util.Set;
 import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
-import java.io.IOException;
-import java.util.Set;
-
 /**
  * This interface defines methods used from {@link PeerSelector}.
  */
@@ -52,6 +51,7 @@ public interface PeerStatusProvider {
      * Fetch peer statuses from a remote NiFi cluster.
      * Implementation of this method should fetch peer statuses from the node
      * represented by the passed PeerDescription using its transport protocol.
+     *
      * @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully
      * @return Remote peer statuses
      * @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 660f5ea..735dc2c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -16,6 +16,17 @@
  */
 package org.apache.nifi.remote.client.http;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
@@ -28,7 +39,6 @@ import org.apache.nifi.remote.client.PeerStatusProvider;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
@@ -39,18 +49,6 @@ import org.apache.nifi.web.api.dto.remote.PeerDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusProvider {
 
     private static final Logger logger = LoggerFactory.getLogger(HttpClient.class);
@@ -80,7 +78,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
         taskExecutor.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                peerSelector.refreshPeers();
+                peerSelector.refresh();
             }
         }, 0, 5, TimeUnit.SECONDS);
 
@@ -99,7 +97,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
 
     @Override
     public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
-        // Each node should has the same URL structure and network reach-ability with the proxy configuration.
+        // Each node should have the same URL structure and network reachability with the proxy configuration
+        // Construct API client and provide to retrieval method
         try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter())) {
             final String scheme = peerDescription.isSecure() ? "https" : "http";
             apiClient.setBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
@@ -110,20 +109,26 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
             apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
             apiClient.setLocalAddress(config.getLocalAddress());
 
-            final Collection<PeerDTO> peers = apiClient.getPeers();
-            if(peers == null || peers.size() == 0){
-                throw new IOException("Couldn't get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
-            }
+           return fetchRemotePeerStatuses(apiClient);
+        }
+    }
 
-            // Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
-            // was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
-            return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
-                    .collect(Collectors.toSet());
+    private Set<PeerStatus> fetchRemotePeerStatuses(SiteToSiteRestApiClient apiClient) throws IOException {
+        // Each node should have the same URL structure and network reachability with the proxy configuration
+        final Collection<PeerDTO> peers = apiClient.getPeers();
+        logger.debug("Retrieved {} peers from {}: {}", peers.size(), apiClient.getBaseUrl(), peers);
+        if (peers.size() == 0) {
+            throw new IOException("Could not get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
         }
+
+        // Convert the PeerDTOs to PeerStatus objects
+        // Each PeerStatus will have the queryPeers flag set to true because Site-to-Site over HTTP
+        // was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed
+        return peers.stream().map(PeerStatus::new).collect(Collectors.toSet());
     }
 
     @Override
-    public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
+    public Transaction createTransaction(final TransferDirection direction) throws IOException {
         final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
 
         PeerStatus peerStatus;
@@ -188,7 +193,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
             // We found a valid peer to communicate with.
             final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
             final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
-                config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) {
+                    config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) {
 
                 @Override
                 protected void close() throws IOException {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 0cf1b53..f168efb 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -16,6 +16,32 @@
  */
 package org.apache.nifi.remote.client.socket;
 
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
@@ -42,33 +68,6 @@ import org.apache.nifi.security.util.CertificateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.security.cert.CertificateException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
 public class EndpointConnectionPool implements PeerStatusProvider {
 
     private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
@@ -122,19 +121,9 @@ public class EndpointConnectionPool implements PeerStatusProvider {
             }
         });
 
-        taskExecutor.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                peerSelector.refreshPeers();
-            }
-        }, 0, 5, TimeUnit.SECONDS);
+        taskExecutor.scheduleWithFixedDelay(() -> peerSelector.refresh(), 0, 5, TimeUnit.SECONDS);
 
-        taskExecutor.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                cleanupExpiredSockets();
-            }
-        }, 5, 5, TimeUnit.SECONDS);
+        taskExecutor.scheduleWithFixedDelay(() -> cleanupExpiredSockets(), 5, 5, TimeUnit.SECONDS);
     }
 
     private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
index 4df12ae..61e9876 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -19,9 +19,10 @@ package org.apache.nifi.remote.protocol;
 import java.io.Closeable;
 import java.io.IOException;
 
+// TODO: Possibly refactor shared interface between this class and SiteToSiteRestApiClient
 public interface CommunicationsSession extends Closeable {
 
-    public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+    byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
 
     CommunicationsInput getInput();
 
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index acca34e..41ef880 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -17,7 +17,8 @@
 package org.apache.nifi.remote.util;
 
 import java.util.Set;
-
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
@@ -45,4 +46,19 @@ public class PeerStatusCache {
     public SiteToSiteTransportProtocol getTransportProtocol() {
         return transportProtocol;
     }
+
+    public boolean isEmpty() {
+        return statuses == null || statuses.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        final ToStringBuilder builder = new ToStringBuilder(this);
+        ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
+        builder.append("Timestamp", timestamp);
+        builder.append("Transport protocol", transportProtocol);
+        builder.append("Peer status count", statuses != null ? statuses.size() : 0);
+        builder.append("Peer statuses", statuses);
+        return builder.toString();
+    }
 }
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 249325d..3852177 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -16,10 +16,63 @@
  */
 package org.apache.nifi.remote.util;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
@@ -85,60 +138,6 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Predicate;
-import java.util.regex.Pattern;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
-
 public class SiteToSiteRestApiClient implements Closeable {
 
     private static final String EVENT_CATEGORY = "Site-to-Site";
@@ -1084,7 +1083,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         String responseMessage = null;
 
         try {
-            responseMessage = new String(bos.toByteArray(), "UTF-8");
+            responseMessage = new String(bos.toByteArray(), StandardCharsets.UTF_8);
             logger.debug("readResponse responseMessage={}", responseMessage);
 
             final ObjectMapper mapper = new ObjectMapper();
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/groovy/org/apache/nifi/remote/client/PeerSelectorTest.groovy b/nifi-commons/nifi-site-to-site-client/src/test/groovy/org/apache/nifi/remote/client/PeerSelectorTest.groovy
new file mode 100644
index 0000000..198dad4
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/groovy/org/apache/nifi/remote/client/PeerSelectorTest.groovy
@@ -0,0 +1,1133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client
+
+
+import org.apache.nifi.remote.PeerDescription
+import org.apache.nifi.remote.PeerStatus
+import org.apache.nifi.remote.TransferDirection
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol
+import org.apache.nifi.remote.util.PeerStatusCache
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.security.Security
+import java.util.concurrent.ArrayBlockingQueue
+
+@RunWith(JUnit4.class)
+class PeerSelectorTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(PeerSelectorTest.class)
+
+    private static final BOOTSTRAP_PEER_DESCRIPTION = new PeerDescription("localhost", -1, false)
+    private static final List<String> DEFAULT_NODES = ["node1.nifi", "node2.nifi", "node3.nifi"]
+    private static final Set<PeerStatus> DEFAULT_PEER_STATUSES = buildPeerStatuses(DEFAULT_NODES)
+    private static final Set<PeerDescription> DEFAULT_PEER_DESCRIPTIONS = DEFAULT_PEER_STATUSES*.peerDescription
+    private static final Map<PeerDescription, Set<PeerStatus>> DEFAULT_PEER_NODES = buildPeersMap(DEFAULT_PEER_STATUSES)
+
+    // Default collaborators
+    private static mockPSP
+    private static mockPP
+
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() {
+        // Mock collaborators
+        mockPSP = mockPeerStatusProvider()
+        mockPP = mockPeerPersistence()
+    }
+
+    @After
+    void tearDown() {
+
+    }
+
+    private static Set<PeerStatus> buildPeerStatuses(List<String> nodes = DEFAULT_NODES) {
+        Set<PeerDescription> nodePeerDescriptions = nodes.collect { String nodeHostname ->
+            new PeerDescription(nodeHostname, -1, false)
+        }
+
+        Set<PeerStatus> peerStatuses = nodePeerDescriptions.collect { PeerDescription pd ->
+            new PeerStatus(pd, 0, true)
+        }
+        peerStatuses
+    }
+
+    /**
+     * Returns a map representing the cluster architecture formed by each hostname having the provided number of flowfiles.
+     *
+     * @param peersWithFlowfiles a map of hostnames to flowfile counts
+     * @return the map with formed objects like PeerStatus and PeerDescription
+     */
+    private static Map<PeerStatus, Integer> buildCluster(Map<String, Integer> peersWithFlowfiles = [:]) {
+        peersWithFlowfiles.collectEntries { String hostname, Integer flowfileCount ->
+            [new PeerStatus(new PeerDescription(hostname, -1, false), flowfileCount, true), flowfileCount]
+        }
+    }
+
+    /**
+     * Returns a map where each key (peer description) is aware of all of its peer nodes (peer statuses).
+     *
+     * @param peerStatuses the set of peer statuses
+     * @return a map of PDs to sibling peers
+     */
+    private static Map<PeerDescription, Set<PeerStatus>> buildPeersMap(Set<PeerStatus> peerStatuses) {
+        peerStatuses.collectEntries { PeerStatus ps ->
+            [ps.peerDescription, peerStatuses.findAll { it.peerDescription.hostname != ps.peerDescription.hostname }]
+        }
+    }
+
+    /**
+     * Returns a map of nodes to expected percentage of flowfiles allocated to/from the node.
+     *
+     * @param nodes the map of nodes to current flowfile count
+     * @param direction the transfer direction
+     * @return the map of nodes to expected allocation
+     */
+    private static Map<String, Double> determineExpectedPercents(Map<String, Integer> nodes, TransferDirection direction = TransferDirection.SEND) {
+        long totalFFC = nodes.values().sum() as long
+        nodes.collectEntries { name, ffc ->
+            [name, PeerSelector.calculateNormalizedWeight(direction, totalFFC, ffc, nodes.size())]
+        }
+    }
+
+    /**
+     * Asserts that the provided frequency results are within {@code TOLERANCE} % of the expected values.
+     *
+     * @param resultsFrequency the map of node to invocations/hits
+     * @param EXPECTED_PERCENTS the map of node to expected percent of hits
+     * @param NUM_TIMES the total number of hits (defaults to the sum of all results)
+     * @param TOLERANCE the tolerance for error (default 0.05 = 5%)
+     */
+    private static void assertDistributionPercentages(Map<String, Integer> resultsFrequency,
+                                                      final Map<String, Double> EXPECTED_PERCENTS,
+                                                      final int NUM_TIMES = resultsFrequency.values().sum() as int,
+                                                      final double TOLERANCE = 0.05) {
+        assert resultsFrequency.keySet() == EXPECTED_PERCENTS.keySet()
+
+        logger.info("  Actual results: ${resultsFrequency.sort()}")
+        logger.info("Expected results: ${EXPECTED_PERCENTS.sort().collect { k, v -> "${k}: ${v}%" }}")
+
+        def max = resultsFrequency.max { a, b -> a.value <=> b.value }
+        def min = resultsFrequency.min { a, b -> a.value <=> b.value }
+        logger.info("Max: ${max.key} (${max.value}) | Min: ${min.key} (${min.value})")
+        def realTolerance = TOLERANCE * NUM_TIMES
+        logger.debug("Tolerance is measured as a percent of total flowfiles (${TOLERANCE * 100}% of ${NUM_TIMES} = ${realTolerance.round(2)})")
+
+        // TODO: Change percentages to be percentage points of total for even comparison
+        EXPECTED_PERCENTS.each { k, v ->
+            def expectedCount = (v / 100) * NUM_TIMES
+            def lowerBound = Math.max(0, (expectedCount - realTolerance).round(2))
+            def upperBound = Math.min(NUM_TIMES, (expectedCount + realTolerance).round(2))
+            def count = resultsFrequency[k]
+            def difference = Math.abs(expectedCount - count) / NUM_TIMES
+            logger.debug("Checking that ${count} is within ±${TOLERANCE * 100}% of ${expectedCount} (${lowerBound}, ${upperBound}) | ${(difference * 100).round(2)}%")
+            assert count >= lowerBound && count <= upperBound
+        }
+    }
+
+    /**
+     * Asserts that the last N peer selections do not have N-1 consecutive selections of the same peer, where N is the total peer count. This is a legacy requirement.
+     *
+     * @param recentPeerSelectionQueue the recently selected peers (the PeerQueue should have been initialized with N elements)
+     * @param nextPeer the next peer
+     */
+    private static void assertConsecutiveSelections(PeerQueue recentPeerSelectionQueue, PeerStatus nextPeer) {
+        recentPeerSelectionQueue.append(nextPeer.peerDescription.hostname)
+        int consecutiveElements = recentPeerSelectionQueue.getMaxConsecutiveElements()
+//        String mcce = recentPeerSelectionQueue.getMostCommonConsecutiveElement()
+//        logger.debug("Most consecutive elements in recentPeerSelectionQueue: ${consecutiveElements} - ${mcce} | ${recentPeerSelectionQueue}")
+        assert consecutiveElements <= recentPeerSelectionQueue.totalSize - 1
+    }
+
+    private static double calculateMean(Map resultsFrequency) {
+        int n = resultsFrequency.size()
+        Object meanIndex = n % 2 == 0 ? (n / 2 - 1)..(n / 2) : (n / 2).intValue()
+        List meanElements = resultsFrequency.values().sort()[meanIndex] as List
+        return meanElements.sum() / meanElements.size()
+    }
+
+    private static PeerStatusProvider mockPeerStatusProvider(PeerDescription bootstrapPeerDescription = BOOTSTRAP_PEER_DESCRIPTION, Map<PeerDescription, Set<PeerStatus>> peersMap = DEFAULT_PEER_NODES) {
+        [getTransportProtocol       : { ->
+            SiteToSiteTransportProtocol.HTTP
+        },
+         getBootstrapPeerDescription: { ->
+             bootstrapPeerDescription
+         },
+         fetchRemotePeerStatuses    : { PeerDescription pd ->
+             peersMap[pd] ?: [] as Set<PeerStatus>
+         }] as PeerStatusProvider
+    }
+
+    private static PeerPersistence mockPeerPersistence(Set<PeerStatus> peerStatuses = DEFAULT_PEER_STATUSES) {
+        [restore: { ->
+            new PeerStatusCache(peerStatuses, System.currentTimeMillis(), SiteToSiteTransportProtocol.HTTP)
+        },
+         save   : { PeerStatusCache psc ->
+             logger.mock("Persisting PeerStatusCache: ${psc}")
+         }] as PeerPersistence
+    }
+
+    private static PeerSelector buildPeerSelectorForCluster(String scenarioName, Map nodes) {
+        // Map the nodes to a cluster
+        def clusterMap = buildCluster(nodes)
+        logger.info("Using cluster map (${scenarioName}): ${clusterMap.collectEntries { k, v -> [k.peerDescription.hostname, v] }}")
+
+        // Build a peer selector with this cluster
+        PeerStatusProvider mockPSP = mockPeerStatusProvider(BOOTSTRAP_PEER_DESCRIPTION, buildPeersMap(clusterMap.keySet()))
+        PeerPersistence mockPP = mockPeerPersistence(clusterMap.keySet())
+
+        new PeerSelector(mockPSP, mockPP)
+    }
+
+    @Test
+    void testGetPeersToQueryShouldBeEmpty() {
+        // Arrange
+
+        // Mock collaborators with empty data
+        mockPSP = mockPeerStatusProvider(BOOTSTRAP_PEER_DESCRIPTION, [:])
+        mockPP = mockPeerPersistence([] as Set)
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+
+        // Act
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // Assert
+        assert peersToQuery.size() == 1
+        assert peersToQuery.first() == BOOTSTRAP_PEER_DESCRIPTION
+    }
+
+    @Test
+    void testShouldGetPeersToQuery() {
+        // Arrange
+        Set<PeerStatus> restoredPeerStatuses = buildPeerStatuses()
+
+        // Mock collaborators
+        mockPP = mockPeerPersistence(restoredPeerStatuses)
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+
+        // Act
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // Assert
+        assert peersToQuery.size() == restoredPeerStatuses.size() + 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+        assert peersToQuery.containsAll(DEFAULT_PEER_DESCRIPTIONS)
+    }
+
+    /**
+     * Asserts that calling the {@code #getPeersToQuery( )} method repeatedly provides the same result because it does not modify {@code lastFetchedQueryablePeers} directly.
+     *
+     */
+    @Test
+    void testGetPeersToQueryShouldBeIdempotent() {
+        // Arrange
+        final int NUM_TIMES = 3
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+
+        // Act
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        def repeatedPeersToQuery = []
+        NUM_TIMES.times { int i ->
+            repeatedPeersToQuery << ps.getPeersToQuery()
+        }
+
+        // Assert
+        assert peersToQuery.size() == DEFAULT_PEER_STATUSES.size() + 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+        assert peersToQuery.containsAll(DEFAULT_PEER_DESCRIPTIONS)
+
+        assert repeatedPeersToQuery.every { it == peersToQuery }
+    }
+
+    @Test
+    void testShouldFetchRemotePeerStatuses() {
+        // Arrange
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+
+        // Act
+        Set<PeerStatus> remotePeerStatuses = ps.fetchRemotePeerStatuses(DEFAULT_PEER_DESCRIPTIONS)
+        logger.info("Retrieved ${remotePeerStatuses.size()} peer statuses: ${remotePeerStatuses}")
+
+        // Assert
+        assert remotePeerStatuses.size() == DEFAULT_PEER_STATUSES.size()
+        assert remotePeerStatuses.containsAll(DEFAULT_PEER_STATUSES)
+    }
+
+    /**
+     * Iterates through test scenarios of 100, 1000, and 10_000 total flowfiles and calculates the relative send and receive weights for every percentage.
+     */
+    @Test
+    void testShouldCalculateNormalizedWeight() {
+        // Arrange
+        def results = [:]
+
+        // Act
+        [3, 5, 7].each { int nodeCount ->
+            results["$nodeCount"] = [:]
+            (2..4).each { int e ->
+                int totalFlowfileCount = 10**e
+                results["$nodeCount"]["$totalFlowfileCount"] = [:]
+                def thisScenario = results["$nodeCount"]["$totalFlowfileCount"]
+                logger.info("Running ${nodeCount} node scenario for ${totalFlowfileCount} total flowfiles")
+                (0..100).each { int i ->
+                    int flowfileCount = (i / 100 * totalFlowfileCount).intValue()
+                    thisScenario["$flowfileCount"] = [:]
+
+                    double sendWeight = PeerSelector.calculateNormalizedWeight(TransferDirection.SEND, totalFlowfileCount, flowfileCount, nodeCount)
+                    double receiveWeight = PeerSelector.calculateNormalizedWeight(TransferDirection.RECEIVE, totalFlowfileCount, flowfileCount, nodeCount)
+
+                    thisScenario["$flowfileCount"]["send"] = sendWeight
+                    thisScenario["$flowfileCount"]["receive"] = receiveWeight
+                }
+            }
+        }
+
+        // Assert
+        results.each { nodeCount, t ->
+            t.each { total, r ->
+                total = Integer.valueOf(total)
+                logger.info("Results for ${nodeCount} nodes with ${total} flowfiles: ")
+                logger.info(["Count", "Send", "Receive"].collect { it.padLeft(10, " ") }.join())
+                int step = total / 10 as int
+                (0..total).step(step).each { int n ->
+                    def data = r["$n"]
+                    def line = [n, data.send, data.receive].collect { (it as String).padLeft(10, " ") }.join()
+                    logger.debug(line)
+                }
+
+                // Assert that the send percentage is always between 0% and 80%
+                assert r.every { k, v -> v.send >= 0 && v.send <= 80 }
+
+                // Assert that the receive percentage is always between 0% and 100%
+                assert r.every { k, v -> v.receive >= 0 && v.receive <= 100 }
+            }
+        }
+    }
+
+    /**
+     * Iterates through test scenarios of 100, 1000, and 10_000 total flowfiles and calculates the relative send and receive weights for every percentage.
+     */
+    @Test
+    void testShouldCalculateNormalizedWeightForSingleRemote() {
+        // Arrange
+        final int NODE_COUNT = 1
+
+        // Act
+        (2..4).each { int e ->
+            int totalFlowfileCount = 10**e
+            logger.info("Running single node scenario for ${totalFlowfileCount} total flowfiles")
+            (0..100).each { int i ->
+                int flowfileCount = (i / 100 * totalFlowfileCount).intValue()
+                double sendWeight = PeerSelector.calculateNormalizedWeight(TransferDirection.SEND, totalFlowfileCount, flowfileCount, NODE_COUNT)
+                double receiveWeight = PeerSelector.calculateNormalizedWeight(TransferDirection.RECEIVE, totalFlowfileCount, flowfileCount, NODE_COUNT)
+
+                // Assert
+                assert sendWeight == 100
+                assert receiveWeight == 100
+            }
+        }
+    }
+
+    @Test
+    void testShouldBuildWeightedPeerMapForSend() {
+        // Arrange
+        def nodes = ["node1.nifi": 20, "node2.nifi": 30, "node3.nifi": 50]
+        def clusterMap = buildCluster(nodes)
+
+        // Sort the map in ascending order by value (SEND)
+        clusterMap = clusterMap.sort { e1, e2 -> e1.value <=> e2.value }
+        logger.info("Using cluster map: ${clusterMap.collectEntries { k, v -> [k.peerDescription.hostname, v] }}")
+
+        mockPSP = mockPeerStatusProvider(BOOTSTRAP_PEER_DESCRIPTION, buildPeersMap(clusterMap.keySet()))
+        mockPP = mockPeerPersistence(clusterMap.keySet())
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+        Set<PeerStatus> peerStatuses = ps.getPeerStatuses()
+
+        // Act
+        LinkedHashMap<PeerStatus, Double> weightedPeerMap = ps.buildWeightedPeerMap(peerStatuses, TransferDirection.SEND)
+        logger.info("Weighted peer map: ${weightedPeerMap}")
+
+        // Assert
+        assert new ArrayList<>(weightedPeerMap.keySet()) == new ArrayList(clusterMap.keySet())
+    }
+
+    @Test
+    void testShouldBuildWeightedPeerMapForReceive() {
+        // Arrange
+        def nodes = ["node1.nifi": 20, "node2.nifi": 30, "node3.nifi": 50]
+        def clusterMap = buildCluster(nodes)
+
+        // Sort the map in descending order by value (RECEIVE)
+        clusterMap = clusterMap.sort { e1, e2 -> e2.value <=> e1.value }
+        logger.info("Using cluster map: ${clusterMap.collectEntries { k, v -> [k.peerDescription.hostname, v] }}")
+
+        mockPSP = mockPeerStatusProvider(BOOTSTRAP_PEER_DESCRIPTION, buildPeersMap(clusterMap.keySet()))
+        mockPP = mockPeerPersistence(clusterMap.keySet())
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+        Set<PeerStatus> peerStatuses = ps.getPeerStatuses()
+
+        // Act
+        LinkedHashMap<PeerStatus, Double> weightedPeerMap = ps.buildWeightedPeerMap(peerStatuses, TransferDirection.RECEIVE)
+        logger.info("Weighted peer map: ${weightedPeerMap}")
+
+        // Assert
+        assert new ArrayList<>(weightedPeerMap.keySet()) == new ArrayList(clusterMap.keySet())
+    }
+
+    /**
+     * This test ensures that regardless of the total flowfile count, the resulting map has
+     * normalized weights (i.e. percentage of 100).
+     */
+    @Test
+    void testCreateDestinationMapForSendShouldBeNormalized() {
+        // Arrange
+        def scenarios = [
+                "100 ff 100/0/0"     : ["node1.nifi": 100, "node2.nifi": 0, "node3.nifi": 0],
+                "100 ff 50/50/0"     : ["node1.nifi": 50, "node2.nifi": 50, "node3.nifi": 0],
+                "100 ff 100/0"       : ["node1.nifi": 100, "node2.nifi": 0],
+                "1000 ff 200/300/500": ["node1.nifi": 200, "node2.nifi": 300, "node3.nifi": 500],
+                "1000 ff 333/333/334": ["node1.nifi": 333, "node2.nifi": 333, "node3.nifi": 334],
+                "1000 ff 0/250x4"    : ["node1.nifi": 0, "node2.nifi": 250, "node3.nifi": 250, "node4.nifi": 250, "node5.nifi": 250],
+                "1000 ff 142x7"      : ((1..7).collectEntries { int i -> ["node${i}.nifi", 1000.intdiv(7)] }),
+                "200 ff 151/1x49"    : ["node1.nifi": 151] + ((2..50).collectEntries { int i -> ["node${i}.nifi", 1] })
+        ]
+
+        scenarios.each { String name, Map nodes ->
+            PeerSelector ps = buildPeerSelectorForCluster(name, nodes)
+            Set<PeerStatus> peerStatuses = ps.getPeerStatuses()
+
+            // Check both SEND and RECEIVE
+            TransferDirection.values().each { TransferDirection direction ->
+                logger.info("Retrieving peers for ${direction} in scenario ${name}")
+
+                // Act
+                Map<PeerStatus, Double> destinationMap = ps.createDestinationMap(peerStatuses, direction)
+                logger.info("Destination map: ${destinationMap}")
+
+                // Assert
+                assert destinationMap.keySet() == peerStatuses
+
+                // For uneven splits, the resulting percentage should be within +/- 1%
+                def totalPercentage = destinationMap.values().sum()
+                assert totalPercentage >= 99 && totalPercentage <= 100
+            }
+        }
+    }
+
+    /**
+     * Test the edge case where there is a rounding error and the selected random number is not captured in the buckets
+     */
+    @Test
+    void testGetAvailablePeerStatusShouldHandleEdgeCase() {
+        // Arrange
+        final int NUM_TIMES = 10000
+
+        def nodes = ["node1.nifi": 2, "node2.nifi": 1, "node3.nifi": 1]
+
+        // Make a map where the weights are artificially suppressed and total far less than 100% to make the edge case more likely
+        Map<PeerStatus, Double> suppressedPercentageMap = buildPeerStatuses(new ArrayList<String>(nodes.keySet())).collectEntries { [it, nodes[it.peerDescription.hostname] / 100.0 as double] }
+
+        PeerSelector ps = buildPeerSelectorForCluster("edge case cluster", nodes)
+
+        // Collect the results and analyze the resulting frequency distribution
+        Map<String, Integer> resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+        // Act
+        NUM_TIMES.times { int i ->
+            def nextPeer = ps.getAvailablePeerStatus(suppressedPercentageMap)
+//            logger.debug("${(i as String).padLeft(Math.log10(NUM_TIMES).intValue())}: ${nextPeer.peerDescription.hostname}")
+            resultsFrequency[nextPeer.peerDescription.hostname]++
+        }
+        logger.info("Peer frequency results (${NUM_TIMES}): ${resultsFrequency}")
+
+        // Assert
+
+        // The actual distribution would be 50/25/25
+        final Map<String, Double> EXPECTED_PERCENTS = ["node1.nifi": 50.0, "node2.nifi": 25.0, "node3.nifi": 25.0]
+
+        assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES, 0.05)
+    }
+
+    @Test
+    void testShouldGetNextPeer() {
+        // Arrange
+        final int NUM_TIMES = 10000
+
+        def nodes = ["node1.nifi": 20, "node2.nifi": 30, "node3.nifi": 50]
+
+        // Check both SEND and RECEIVE
+        TransferDirection.values().each { TransferDirection direction ->
+            logger.info("Selecting ${NUM_TIMES} peers for ${direction}")
+
+            PeerSelector ps = buildPeerSelectorForCluster("100 ff 20/30/50", nodes)
+
+            // Collect the results and analyze the resulting frequency distribution
+            Map<String, Integer> resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+            // Act
+            NUM_TIMES.times { int i ->
+                def nextPeer = ps.getNextPeerStatus(direction)
+//                logger.debug("${(i as String).padLeft(Math.log10(NUM_TIMES).intValue())}: ${nextPeer.peerDescription.hostname}")
+                resultsFrequency[nextPeer.peerDescription.hostname]++
+            }
+            logger.info("Peer frequency results (${NUM_TIMES}): ${resultsFrequency}")
+
+            // Assert
+            final Map<String, Double> EXPECTED_PERCENTS = determineExpectedPercents(nodes, direction)
+            assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES)
+        }
+    }
+
+    /**
+     * When the cluster is balanced, the consecutive selection of peers should not repeat {@code cluster.size( ) - 1} times.
+     */
+    @Test
+    void testGetNextPeerShouldNotRepeatPeersOnBalancedCluster() {
+        // Arrange
+        final int NUM_TIMES = 10000
+
+        def nodes = ((1..10).collectEntries { int i -> ["node${i}.nifi".toString(), 100] })
+        PeerSelector ps = buildPeerSelectorForCluster("1000 ff 100x10", nodes)
+
+        // Check both SEND and RECEIVE
+        TransferDirection.values().each { TransferDirection direction ->
+            logger.info("Selecting ${NUM_TIMES} peers for ${direction}")
+
+            // Collect the results and analyze the resulting frequency distribution
+            def resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+            // Use the queue to track recent peers and observe repeated selections
+            PeerQueue lastN = new PeerQueue(nodes.size())
+
+            // Act
+            NUM_TIMES.times { int i ->
+                def nextPeer = ps.getNextPeerStatus(direction)
+                resultsFrequency[nextPeer.peerDescription.hostname]++
+
+                // Assert the consecutive selections are ok
+                assertConsecutiveSelections(lastN, nextPeer)
+            }
+
+            // Assert
+            final def EXPECTED_PERCENTS = nodes.collectEntries { [it.key, 10.0] }
+
+            // The tolerance should be a bit higher because of the high number of nodes and even distribution
+            assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES, 0.10)
+        }
+    }
+
+    /**
+     * When a remote has only one valid peer, that peer should be selected every time
+     */
+    @Test
+    void testGetNextPeerShouldRepeatPeersOnSingleValidDestination() {
+        // Arrange
+        final int NUM_TIMES = 100
+
+        // Single destination scenarios
+        def scenarios = [
+                "single node"      : ["node1.nifi": 100],
+                "single empty node": ["node1.nifi": 0],
+                "100 ff 100/0"     : ["node1.nifi": 100, "node2.nifi": 0],
+        ]
+
+        scenarios.each { String name, Map nodes ->
+            PeerSelector ps = buildPeerSelectorForCluster(name, nodes)
+
+            // Check both SEND and RECEIVE
+            TransferDirection.values().each { TransferDirection direction ->
+                logger.info("Selecting ${NUM_TIMES} peers for ${direction} in scenario ${name}")
+
+                // Collect the results and analyze the resulting frequency distribution
+                def resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+                // Use the queue to track recent peers and observe repeated selections
+                PeerQueue lastN = new PeerQueue(nodes.size())
+
+                // Act
+                NUM_TIMES.times { int i ->
+                    def nextPeer = ps.getNextPeerStatus(direction)
+                    resultsFrequency[nextPeer.peerDescription.hostname]++
+
+                    // Assert the consecutive selections are ok (i.e. it IS selecting the same peer repeatedly)
+                    if (lastN.remainingCapacity() == 0) {
+                        lastN.remove()
+                    }
+                    lastN.put(nextPeer.peerDescription.hostname)
+
+                    // Spot check consecutive selection
+                    if (i % 10 == 0) {
+                        int consecutiveElements = lastN.getMaxConsecutiveElements()
+                        assert consecutiveElements == lastN.size()
+                    }
+                }
+
+                // Assert
+                final def EXPECTED_PERCENTS = determineExpectedPercents(nodes, direction)
+                logger.info("Expected percentages for ${name}: ${EXPECTED_PERCENTS}")
+
+                // The tolerance should be zero; exact matches only
+                assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES, 0.00)
+            }
+        }
+    }
+
+    /**
+     * The legacy requirement that the next peer not repeat N-1 times where N is the size of the remote cluster does not apply to the following scenarios:
+     *
+     * * A remote of size <= 3
+     * * An unbalanced remote (33/33/33/0) <em>should</em> repeat the last peer multiple times
+     */
+    @Test
+    void testGetNextPeerShouldRepeatPeersOnUnbalancedCluster() {
+        // Arrange
+
+        // Using a higher iteration count smooths out outliers
+        final int NUM_TIMES = 10000
+
+        // Scenarios where consecutively-selected peers are expected to sometimes repeat (small clusters, uneven clusters)
+        def scenarios = [
+                "100 ff 50/50"            : ["node1.nifi": 50, "node2.nifi": 50],
+                "100 ff 75/25"            : ["node1.nifi": 75, "node2.nifi": 25],
+                "100 ff 50/50/0"          : ["node1.nifi": 50, "node2.nifi": 50, "node3.nifi": 0],
+                "1000 ff 800/200/0"       : ["node1.nifi": 800, "node2.nifi": 200, "node3.nifi": 0],
+                "10 ff 8/2/0"             : ["node1.nifi": 8, "node2.nifi": 2, "node3.nifi": 0],
+                "200 ff 66x3/0"           : ["node1.nifi": 66, "node2.nifi": 66, "node3.nifi": 66, "node4.nifi": 0],
+                "1000 ff 0/250x4"         : ["node1.nifi": 0, "node2.nifi": 250, "node3.nifi": 250, "node4.nifi": 250, "node5.nifi": 250],
+                "1000 ff 0/111x9"         : ["node1.nifi": 0] + ((2..10).collectEntries { ["node${it}.nifi".toString(), 111] }),
+                "legacy 1024/10240/4096x3": ["node1.nifi": 1024, "node2.nifi": 10240] + (3..5).collectEntries { ["node${it}.nifi".toString(), 4096] },
+                "legacy 50k/500"          : ["node1.nifi": 50_000, "node2.nifi": 50],
+        ]
+
+        scenarios.each { String name, Map nodes ->
+            PeerSelector ps = buildPeerSelectorForCluster(name, nodes)
+
+            // Check both SEND and RECEIVE
+            TransferDirection.values().each { TransferDirection direction ->
+                logger.info("Selecting ${NUM_TIMES} peers for ${direction} in scenario ${name}")
+
+                // Collect the results and analyze the resulting frequency distribution
+                def resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+                logger.debug("Initialized results map to ${resultsFrequency}")
+
+                // Use the queue to track recent peers and observe repeated selections
+                PeerQueue lastN = new PeerQueue(nodes.size())
+
+                // Act
+                NUM_TIMES.times { int i ->
+                    def nextPeer = ps.getNextPeerStatus(direction)
+//                logger.debug("${(i as String).padLeft(Math.log10(NUM_TIMES).intValue())}: ${nextPeer.peerDescription.hostname}")
+                    resultsFrequency[nextPeer.peerDescription.hostname]++
+
+                    // Assert the consecutive selections are ok (i.e. it IS selecting the same peer repeatedly)
+                    if (lastN.remainingCapacity() == 0) {
+                        lastN.remove()
+                    }
+                    lastN.put(nextPeer.peerDescription.hostname)
+
+                    int consecutiveElements = lastN.getMaxConsecutiveElements()
+                    if (consecutiveElements == nodes.size() && nodes.size() > 3) {
+                        logger.debug("Most consecutive elements in recentPeerSelectionQueue: ${consecutiveElements} | ${lastN}")
+                    }
+                }
+
+                // Assert
+                final def EXPECTED_PERCENTS = determineExpectedPercents(nodes, direction)
+                logger.info("Expected percentages for ${name}: ${EXPECTED_PERCENTS}")
+
+                assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES)
+            }
+        }
+    }
+
+    /**
+     * Test the edge case where peers are penalized
+     */
+    @Test
+    void testGetAvailablePeerStatusShouldHandlePenalizedPeers() {
+        // Arrange
+        final int NUM_TIMES = 100
+
+        // Should prefer node1, but it will be penalized
+        def nodes = ["node1.nifi": 10, "node2.nifi": 90]
+
+        // Make a map where the weights are normal
+        def peerStatuses = buildPeerStatuses(new ArrayList<String>(nodes.keySet()))
+        Map<PeerStatus, Double> weightMap = peerStatuses.collectEntries { [it, nodes[it.peerDescription.hostname] as double] }
+
+        PeerSelector ps = buildPeerSelectorForCluster("penalized peer", nodes)
+
+        // Penalize node1
+        ps.penalize(peerStatuses.sort().first().peerDescription, 10_000)
+
+        // Collect the results and analyze the resulting frequency distribution
+        Map<String, Integer> resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+        // Act
+        NUM_TIMES.times { int i ->
+            def nextPeer = ps.getAvailablePeerStatus(weightMap)
+//            logger.debug("${(i as String).padLeft(Math.log10(NUM_TIMES).intValue())}: ${nextPeer.peerDescription.hostname}")
+            resultsFrequency[nextPeer.peerDescription.hostname]++
+        }
+        logger.info("Peer frequency results (${NUM_TIMES}): ${resultsFrequency}")
+
+        // Assert
+
+        // The actual distribution would be .9/.1, but because of the penalization, all selections will be node2
+        final Map<String, Double> EXPECTED_PERCENTS = ["node1.nifi": 0.0, "node2.nifi": 100.0]
+
+        // The tolerance should be very tight as this will be almost exact every time
+        assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES, 0.00)
+    }
+
+    /**
+     * Test the edge case where peers are penalized
+     */
+    @Test
+    void testGetAvailablePeerStatusShouldHandleMultiplePenalizedPeers() {
+        // Arrange
+        final int NUM_TIMES = 10_000
+
+        // Should distribute evenly, but 1/2 of the nodes will be penalized
+        def nodes = ["node1.nifi": 25, "node2.nifi": 25, "node3.nifi": 25, "node4.nifi": 25]
+
+        // Make a map where the weights are normal
+        def peerStatuses = buildPeerStatuses(new ArrayList<String>(nodes.keySet()))
+        Map<PeerStatus, Double> weightMap = peerStatuses.collectEntries { [it, nodes[it.peerDescription.hostname] as double] }
+
+        PeerSelector ps = buildPeerSelectorForCluster("penalized peers", nodes)
+
+        // Penalize node1 & node3
+        def penalizedPeerStatuses = peerStatuses.findAll { ["node1.nifi", "node3.nifi"].contains(it.peerDescription.hostname) }
+        penalizedPeerStatuses.each { ps.penalize(it.peerDescription, 10_000) }
+
+        // Collect the results and analyze the resulting frequency distribution
+        Map<String, Integer> resultsFrequency = nodes.keySet().collectEntries { [it, 0] }
+
+        // Act
+        NUM_TIMES.times { int i ->
+            def nextPeer = ps.getAvailablePeerStatus(weightMap)
+//            logger.debug("${(i as String).padLeft(Math.log10(NUM_TIMES).intValue())}: ${nextPeer.peerDescription.hostname}")
+            resultsFrequency[nextPeer.peerDescription.hostname]++
+        }
+        logger.info("Peer frequency results (${NUM_TIMES}): ${resultsFrequency}")
+
+        // Assert
+
+        // The actual distribution would be .25 * 4, but because of the penalization, node2 and node4 will each have ~50%
+        final Map<String, Double> EXPECTED_PERCENTS = ["node1.nifi": 0.0, "node2.nifi": 50.0, "node3.nifi": 0.0, "node4.nifi": 50.0]
+
+        assertDistributionPercentages(resultsFrequency, EXPECTED_PERCENTS, NUM_TIMES, 0.01)
+    }
+
+    // Copied legacy tests from TestPeerSelector
+
+    /**
+     * Test that the cache is the source of peer statuses initially
+     */
+    @Test
+    void testInitializationShouldRestorePeerStatusFileCache() {
+        // Arrange
+        def nodes = DEFAULT_NODES
+        def peerStatuses = DEFAULT_PEER_STATUSES
+
+        // Create the peer status provider
+        mockPSP = mockPeerStatusProvider()
+
+        // Point to the persisted cache on disk
+        final File cacheFile = File.createTempFile("peers", "txt")
+        cacheFile.deleteOnExit()
+
+        // Construct the cache contents and write to disk
+        final String CACHE_CONTENTS = "${mockPSP.getTransportProtocol()}\n" + peerStatuses.collect { PeerStatus ps ->
+            [ps.peerDescription.hostname, ps.peerDescription.port, ps.peerDescription.isSecure(), ps.isQueryForPeers()].join(":")
+        }.join("\n")
+        cacheFile.text = CACHE_CONTENTS
+
+        FilePeerPersistence filePP = new FilePeerPersistence(cacheFile)
+
+        // Act
+
+        // The constructor should restore the initial cache
+        PeerSelector ps = new PeerSelector(mockPSP, filePP)
+
+        // PeerSelector should access peer statuses from cache
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // Assert
+        assert peersToQuery.size() == nodes.size() + 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+        assert peersToQuery.containsAll(DEFAULT_PEER_DESCRIPTIONS)
+    }
+
+    /**
+     * Test that if the cache is expired, it is not used
+     */
+    @Test
+    void testRefreshShouldHandleExpiredPeerStatusFileCache() {
+        // Arrange
+        def nodes = DEFAULT_NODES
+        def peerStatuses = DEFAULT_PEER_STATUSES
+
+        // Create the peer status provider with no actual remote peers
+        mockPSP = mockPeerStatusProvider(BOOTSTRAP_PEER_DESCRIPTION, [:])
+
+        // Point to the persisted cache on disk
+        final File cacheFile = File.createTempFile("peers", "txt")
+        cacheFile.deleteOnExit()
+
+        // Construct the cache contents and write to disk
+        final String CACHE_CONTENTS = "${mockPSP.getTransportProtocol()}\n" + peerStatuses.collect { PeerStatus ps ->
+            [ps.peerDescription.hostname, ps.peerDescription.port, ps.peerDescription.isSecure(), ps.isQueryForPeers()].join(":")
+        }.join("\n")
+        cacheFile.text = CACHE_CONTENTS
+
+        // Mark the file as expired
+        cacheFile.lastModified = System.currentTimeMillis() - (PeerSelector.PEER_CACHE_MILLIS * 2)
+
+        FilePeerPersistence filePP = new FilePeerPersistence(cacheFile)
+
+        // Act
+
+        // The constructor should restore the initial cache
+        PeerSelector ps = new PeerSelector(mockPSP, filePP)
+
+        // Assert
+
+        // The loaded cache should be marked as expired and not used
+        assert ps.isCacheExpired(ps.peerStatusCache)
+
+        // This internal method does not refresh or check expiration
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // The cache has (expired) peer statuses present
+        assert peersToQuery.size() == nodes.size() + 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+        assert peersToQuery.containsAll(DEFAULT_PEER_DESCRIPTIONS)
+
+        // Trigger the cache expiration detection
+        ps.refresh()
+
+        peersToQuery = ps.getPeersToQuery()
+        logger.info("After cache expiration, retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // The cache only contains the bootstrap node
+        assert peersToQuery.size() == 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+    }
+
+    Throwable generateException(String message, int nestedLevel = 0) {
+        IOException e = new IOException(message)
+        nestedLevel.times { int i ->
+            e = new IOException("${message} ${i + 1}", e)
+        }
+        e
+    }
+
+    /**
+     * Test that printing the exception does not cause an infinite loop
+     */
+    @Test
+    void testRefreshShouldHandleExceptions() {
+        // Arrange
+        mockPP = [
+                restore: { ->
+                    new PeerStatusCache([] as Set<PeerStatus>, System.currentTimeMillis(), SiteToSiteTransportProtocol.HTTP)
+                },
+                // Create the peer persistence to throw an exception on save
+                save   : { PeerStatusCache cache ->
+                    throw generateException("Custom error message", 3)
+                }
+        ] as PeerPersistence
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+
+        // Act
+        ps.refreshPeerStatusCache()
+        def peersToQuery = ps.getPeersToQuery()
+
+        // Assert
+        assert peersToQuery.size() == 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+    }
+
+    /**
+     * Test that the cache is not used if it does not match the transport protocol
+     */
+    @Test
+    void testInitializationShouldIgnoreCacheWithWrongTransportProtocol() {
+        // Arrange
+        def nodes = DEFAULT_NODES
+        def peerStatuses = DEFAULT_PEER_STATUSES
+
+        // Create the peer status provider
+        mockPSP = mockPeerStatusProvider()
+
+        // Point to the persisted cache on disk
+        final File cacheFile = File.createTempFile("peers", "txt")
+        cacheFile.deleteOnExit()
+
+        // Construct the cache contents (with wrong TP - mockPSP uses HTTP) and write to disk
+        final String CACHE_CONTENTS = "${SiteToSiteTransportProtocol.RAW}\n" + peerStatuses.collect { PeerStatus ps ->
+            [ps.peerDescription.hostname, ps.peerDescription.port, ps.peerDescription.isSecure(), ps.isQueryForPeers()].join(":")
+        }.join("\n")
+        cacheFile.text = CACHE_CONTENTS
+
+        FilePeerPersistence filePP = new FilePeerPersistence(cacheFile)
+
+        // Act
+        PeerSelector ps = new PeerSelector(mockPSP, filePP)
+
+        // The cache should be ignored because of the transport protocol mismatch
+        def peersToQuery = ps.getPeersToQuery()
+        logger.info("Retrieved ${peersToQuery.size()} peers to query: ${peersToQuery}")
+
+        // Assert
+        assert peersToQuery.size() == 1
+        assert peersToQuery.contains(BOOTSTRAP_PEER_DESCRIPTION)
+    }
+
+    /**
+     * This test simulates a failure scenario of a remote NiFi cluster. It confirms that:
+     * <ol>
+     *     <li>PeerSelector uses the bootstrap node to fetch remote peer statuses at the initial attempt</li>
+     *     <li>PeerSelector uses one of query-able nodes lastly fetched successfully</li>
+     *     <li>PeerSelector can refresh remote peer statuses even if the bootstrap node is down</li>
+     *     <li>PeerSelector returns null as next peer when there's no peer available</li>
+     *     <li>PeerSelector always tries to fetch peer statuses at least from the bootstrap node, so that it can
+     *     recover when the node gets back online</li>
+     * </ol>
+     */
+    @Test
+    void testShouldFetchRemotePeerStatusesInFailureScenario() throws IOException {
+        // Arrange
+        int currentAttempt = 1
+
+        // The bootstrap node is node1.nifi
+        List<String> nodes = ["node1.nifi", "node2.nifi"]
+        Set<PeerStatus> peerStatuses = buildPeerStatuses(nodes)
+
+        // Need references to the bootstrap and node2 later
+        PeerStatus bootstrapStatus = peerStatuses.find { it.peerDescription.hostname == "node1.nifi" }
+        PeerDescription bootstrapDescription = bootstrapStatus.peerDescription
+
+        PeerStatus node2Status = peerStatuses.find { it.peerDescription.hostname == "node2.nifi" }
+        PeerDescription node2Description = node2Status.peerDescription
+
+        // Mock the PSP
+        mockPSP = [
+                getTransportProtocol       : { ->
+                    SiteToSiteTransportProtocol.HTTP
+                },
+                getBootstrapPeerDescription: { ->
+                    bootstrapDescription
+                },
+                fetchRemotePeerStatuses    : { PeerDescription pd ->
+                    // Depending on the scenario, return given peer statuses
+                    logger.mock("Scenario ${currentAttempt} fetchRemotePeerStatus for ${pd}")
+                    switch (currentAttempt) {
+                        case 1:
+                            return [bootstrapStatus, node2Status] as Set<PeerStatus>
+                        case 2..3:
+                            return [node2Status] as Set<PeerStatus>
+                        case 4:
+                            return [] as Set<PeerStatus>
+                        default:
+                            return [bootstrapStatus] as Set<PeerStatus>
+                    }
+                }
+        ] as PeerStatusProvider
+
+        // Mock the PP with only these statuses
+        mockPP = mockPeerPersistence(peerStatuses)
+
+        PeerSelector ps = new PeerSelector(mockPSP, mockPP)
+        ps.refresh()
+        PeerStatus peerStatus = ps.getNextPeerStatus(TransferDirection.RECEIVE)
+        logger.info("Attempt ${currentAttempt} - ${peerStatus}")
+        assert peerStatus
+
+        // Force the selector to refresh the cache
+        currentAttempt++
+        ps.refreshPeerStatusCache()
+
+        // Attempt 2 & 3 - only node2 available (PSP will only return node2)
+        2.times {
+            ps.refresh()
+            peerStatus = ps.getNextPeerStatus(TransferDirection.RECEIVE)
+            logger.info("Attempt ${currentAttempt} - ${peerStatus}")
+            assert peerStatus == node2Status
+
+            // Force the selector to refresh the cache
+            currentAttempt++
+            ps.refreshPeerStatusCache()
+        }
+
+        // Attempt 4 - no available nodes
+        ps.refresh()
+        peerStatus = ps.getNextPeerStatus(TransferDirection.RECEIVE)
+        logger.info("Attempt ${currentAttempt} - ${peerStatus}")
+        assert !peerStatus
+
+        // Force the selector to refresh the cache
+        currentAttempt = 5
+        ps.refreshPeerStatusCache()
+
+        // Attempt 5 - bootstrap node available
+        ps.refresh()
+        peerStatus = ps.getNextPeerStatus(TransferDirection.RECEIVE)
+        logger.info("Attempt ${currentAttempt} - ${peerStatus}")
+        assert peerStatus == bootstrapStatus
+    }
+
+    // PeerQueue definition and tests
+
+    /**
+     * Tests the utility class {@link PeerQueue} used to track consecutive peer selection.
+     */
+    @Test
+    void testPeerQueueShouldGetMaxConsecutiveElements() {
+        // Arrange
+        PeerQueue peerQueue = new PeerQueue(10)
+        List<String> nodes = (1..5).collect { "node${it}.nifi".toString() }
+        List<PeerStatus> peerStatuses = new ArrayList<>(buildPeerStatuses(nodes))
+
+        // Act
+
+        // Same node every time
+        100.times { int i ->
+            peerQueue.append(nodes.first())
+
+            // Assert
+            assert peerQueue.getMaxConsecutiveElements() == peerQueue.size()
+        }
+
+        // Never repeating node
+        peerQueue.clear()
+        100.times { int i ->
+            peerQueue.append(nodes.get(i % peerStatuses.size()))
+
+            // Assert
+            assert peerQueue.getMaxConsecutiveElements() == 1
+        }
+
+        // Repeat up to nodes.size() times but no more
+        peerQueue.clear()
+        100.times { int i ->
+            // Puts the first node unless this is a multiple of the node count
+            peerQueue.append((i % nodes.size() == 0) ? nodes.last() : nodes.first())
+
+            // Assert
+//            logger.debug("Most consecutive elements in queue: ${peerQueue.getMaxConsecutiveElements()} | ${peerQueue}")
+            assert peerQueue.getMaxConsecutiveElements() <= peerStatuses.size()
+        }
+    }
+
+    class PeerQueue extends ArrayBlockingQueue {
+        PeerQueue(int capacity) {
+            super(capacity)
+        }
+
+        int getTotalSize() {
+            this.size() + this.remainingCapacity()
+        }
+
+        int getMaxConsecutiveElements() {
+            int currentMax = 1, current = 1
+            def iterator = this.iterator()
+            Object prev = iterator.next()
+            while (iterator.hasNext()) {
+                def curr = iterator.next()
+                if (prev == curr) {
+                    current++
+                    if (current > currentMax) {
+                        currentMax = current
+                    }
+                } else {
+                    current = 1
+                }
+                prev = curr
+            }
+            return currentMax
+        }
+
+        Object getMostFrequentElement() {
+            def map = this.groupBy { it }
+            map.max { a, b -> a.value.size() <=> b.value.size() }.key
+        }
+
+        Object getMostCommonConsecutiveElement() {
+            int currentMax = 1, current = 1
+            def iterator = this.iterator()
+            Object prev = iterator.next()
+            Object mcce = prev
+            while (iterator.hasNext()) {
+                def curr = iterator.next()
+                if (prev == curr) {
+                    current++
+                    if (current > currentMax) {
+                        currentMax = current
+                        mcce = curr
+                    }
+                } else {
+                    current = 1
+                }
+                prev = curr
+            }
+            return mcce
+        }
+
+        /**
+         * Adds the new Object to the tail of the queue. If the queue was full before, removes the head to open capacity.
+         *
+         * @param o the object to append
+         */
+        void append(Object o) {
+            if (this.remainingCapacity() == 0) {
+                this.remove()
+            }
+            this.put(o)
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
deleted file mode 100644
index d98774e..0000000
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.reducing;
-import static java.util.stream.Collectors.toMap;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-
-public class TestPeerSelector {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestPeerSelector.class);
-
-    private Map<String, Integer> calculateAverageSelectedCount(Set<PeerStatus> collection, List<PeerStatus> destinations) {
-        // Calculate hostname entry, for average calculation. Because there're multiple entry with same host name, different port.
-        final Map<String, Integer> hostNameCounts
-                = collection.stream().collect(groupingBy(p -> p.getPeerDescription().getHostname(), reducing(0, p -> 1, Integer::sum)));
-
-        // Calculate how many times each hostname is selected.
-        return destinations.stream().collect(groupingBy(p -> p.getPeerDescription().getHostname(), reducing(0, p -> 1, Integer::sum)))
-                .entrySet().stream().collect(toMap(Map.Entry::getKey, e -> {
-                    return e.getValue() / hostNameCounts.get(e.getKey());
-                }));
-    }
-
-    @Test
-    public void testFormulateDestinationListForOutputEven() throws IOException {
-        final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true));
-
-        PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-
-        final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
-        final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
-
-        logger.info("selectedCounts={}", selectedCounts);
-
-        int consecutiveSamePeerCount = 0;
-        PeerStatus previousPeer = null;
-        for (PeerStatus peer : destinations) {
-            if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) {
-                consecutiveSamePeerCount++;
-                // The same peer shouldn't be used consecutively (number of nodes - 1) times or more.
-                if (consecutiveSamePeerCount >= (collection.size() - 1)) {
-                    fail("The same peer is returned consecutively too frequently.");
-                }
-            } else {
-                consecutiveSamePeerCount = 0;
-            }
-            previousPeer = peer;
-        }
-
-    }
-
-    @Test
-    public void testFormulateDestinationListForOutput() throws IOException {
-        final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024, true));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
-
-        PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-
-        final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
-        final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
-
-        logger.info("selectedCounts={}", selectedCounts);
-        assertTrue("HasLots should send lots", selectedCounts.get("HasLots") > selectedCounts.get("HasMedium"));
-        assertTrue("HasMedium should send medium", selectedCounts.get("HasMedium") > selectedCounts.get("HasLittle"));
-    }
-
-    @Test
-    public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
-        final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000, true));
-
-        PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-
-        final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
-        final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
-
-        logger.info("selectedCounts={}", selectedCounts);
-        assertTrue("HasLots should send lots", selectedCounts.get("HasLots") > selectedCounts.get("HasLittle"));
-    }
-
-    @Test
-    public void testFormulateDestinationListForInputPorts() throws IOException {
-        final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024, true));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
-        collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
-
-        PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-
-        final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
-        final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
-
-        logger.info("selectedCounts={}", selectedCounts);
-        assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasMedium"));
-        assertTrue("HasMedium should get medium", selectedCounts.get("HasMedium") < selectedCounts.get("HasLittle"));
-    }
-
-    @Test
-    public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
-        final Set<PeerStatus> collection = new HashSet<>();
-        collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500, true));
-        collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000, true));
-
-        PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-
-        final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
-        final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
-
-        logger.info("selectedCounts={}", selectedCounts);
-        assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasLittle"));
-    }
-
-    private static class UnitTestSystemTime extends PeerSelector.SystemTime {
-        private long offset = 0;
-
-        @Override
-        long currentTimeMillis() {
-            return super.currentTimeMillis() + offset;
-        }
-    }
-
-    /**
-     * This test simulates a failure scenario of a remote NiFi cluster. It confirms that:
-     * <ol>
-     *     <li>PeerSelector uses the bootstrap node to fetch remote peer statuses at the initial attempt</li>
-     *     <li>PeerSelector uses one of query-able nodes lastly fetched successfully</li>
-     *     <li>PeerSelector can refresh remote peer statuses even if the bootstrap node is down</li>
-     *     <li>PeerSelector returns null as next peer when there's no peer available</li>
-     *     <li>PeerSelector always tries to fetch peer statuses at least from the bootstrap node, so that it can
-     *     recover when the node gets back online</li>
-     * </ol>
-     */
-    @Test
-    public void testFetchRemotePeerStatuses() throws IOException {
-
-        final Set<PeerStatus> peerStatuses = new HashSet<>();
-        final PeerDescription bootstrapNode = new PeerDescription("Node1", 1111, true);
-        final PeerDescription node2 = new PeerDescription("Node2", 2222, true);
-        final PeerStatus bootstrapNodeStatus = new PeerStatus(bootstrapNode, 10, true);
-        final PeerStatus node2Status = new PeerStatus(node2, 10, true);
-        peerStatuses.add(bootstrapNodeStatus);
-        peerStatuses.add(node2Status);
-
-        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        final PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
-        final UnitTestSystemTime systemTime = new UnitTestSystemTime();
-        peerSelector.setSystemTime(systemTime);
-
-        doReturn(bootstrapNode).when(peerStatusProvider).getBootstrapPeerDescription();
-        doAnswer(invocation -> {
-            final PeerDescription peerFetchStatusesFrom = invocation.getArgument(0);
-            if (peerStatuses.stream().filter(ps -> ps.getPeerDescription().equals(peerFetchStatusesFrom)).collect(Collectors.toSet()).size() > 0) {
-                // If the remote peer is running, then return available peer statuses.
-                return peerStatuses;
-            }
-            throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
-        }).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
-
-        // 1st attempt. It uses the bootstrap node.
-        peerSelector.refreshPeers();
-        PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
-        assertNotNull(peerStatus);
-
-        // Proceed time so that peer selector refresh statuses.
-        peerStatuses.remove(bootstrapNodeStatus);
-        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
-
-        // 2nd attempt.
-        peerSelector.refreshPeers();
-        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
-        assertNotNull(peerStatus);
-        assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
-
-        // Proceed time so that peer selector refresh statuses.
-        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
-
-        // 3rd attempt.
-        peerSelector.refreshPeers();
-        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
-        assertNotNull(peerStatus);
-        assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
-
-        // Remove node2 to simulate that it goes down. There's no available node at this point.
-        peerStatuses.remove(node2Status);
-        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
-
-        peerSelector.refreshPeers();
-        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
-        assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus);
-
-        // Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
-        peerStatuses.add(bootstrapNodeStatus);
-        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
-
-        peerSelector.refreshPeers();
-        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
-        assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription());
-    }
-
-    @Test
-    public void testPeerStatusManagedCache() throws Exception {
-        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        final StateManager stateManager = Mockito.mock(StateManager.class);
-        final StateMap stateMap = Mockito.mock(StateMap.class);
-        final Map<String, String> state = new HashMap<>();
-        state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
-        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
-        when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
-        when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
-        doAnswer(invocation -> {
-            final Map<String, String> updatedMap = invocation.getArgument(0);
-            state.clear();
-            state.putAll(updatedMap);
-            return null;
-        }).when(stateManager).setState(any(), eq(Scope.LOCAL));
-
-        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
-        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
-        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
-            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
-
-        // PeerSelector should restore peer statuses from managed cache.
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
-        peerSelector.refreshPeers();
-        assertEquals("Restored peers should be used",
-            "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
-
-        // If the stored state is too old, PeerSelector refreshes peers.
-        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis() - 120_000));
-        peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
-        peerSelector.refreshPeers();
-        assertEquals("Peers should be refreshed",
-            "RAW\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
-    }
-
-    @Test
-    public void testPeerStatusManagedCacheDifferentProtocol() throws Exception {
-        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-        final StateManager stateManager = Mockito.mock(StateManager.class);
-        final StateMap stateMap = Mockito.mock(StateMap.class);
-        final Map<String, String> state = new HashMap<>();
-        state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
-        state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
-        when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
-        when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
-        doAnswer(invocation -> {
-            final Map<String, String> updatedMap = invocation.getArgument(0);
-            state.clear();
-            state.putAll(updatedMap);
-            return null;
-        }).when(stateManager).setState(any(), eq(Scope.LOCAL));
-
-        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
-        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
-        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
-            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
-
-        // PeerSelector should NOT restore peer statuses from managed cache because protocol changed.
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
-        peerSelector.refreshPeers();
-        assertEquals("Restored peers should NOT be used",
-            "HTTP\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
-    }
-
-    @Test
-    public void testPeerStatusFileCache() throws Exception {
-        final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
-
-        final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
-        when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
-        when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
-        when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
-            .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
-
-        final File file = File.createTempFile("peers", "txt");
-        file.deleteOnExit();
-
-        try (final FileOutputStream fos = new FileOutputStream(file)) {
-            fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
-        }
-
-        final Supplier<String> readFile = () -> {
-            try (final FileInputStream fin = new FileInputStream(file);
-                 final BufferedReader reader = new BufferedReader(new InputStreamReader(fin))) {
-                final StringBuilder lines = new StringBuilder();
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    lines.append(line).append("\n");
-                }
-                return lines.toString();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        };
-
-        // PeerSelector should restore peer statuses from managed cache.
-        PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
-        peerSelector.refreshPeers();
-        assertEquals("Restored peers should be used",
-            "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", readFile.get());
-
-        // If the stored state is too old, PeerSelector refreshes peers.
-        file.setLastModified(System.currentTimeMillis() - 120_000);
-        peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
-        peerSelector.refreshPeers();
-        assertEquals("Peers should be refreshed",
-            "RAW\nnifi0:8081:false:true\n", readFile.get());
-    }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml b/nifi-commons/nifi-site-to-site-client/src/test/resources/logback-test.xml
similarity index 73%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
copy to nifi-commons/nifi-site-to-site-client/src/test/resources/logback-test.xml
index 99d1743..9e235bd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
+++ b/nifi-commons/nifi-site-to-site-client/src/test/resources/logback-test.xml
@@ -30,15 +30,8 @@
 
 
     <logger name="org.apache.nifi" level="INFO"/>
-    <logger name="org.apache.nifi.controller.tasks" level="DEBUG" />"
-    <logger name="org.apache.nifi.controller.service" level="DEBUG"/>
-    <logger name="org.apache.nifi.encrypt" level="DEBUG"/>
-    <logger name="org.apache.nifi.security.util.crypto" level="DEBUG"/>
-    <logger name="org.apache.nifi.controller.repository.crypto" level="DEBUG"/>
-    <logger name="org.apache.nifi.security.repository" level="DEBUG"/>
-    <logger name="org.apache.nifi.controller.service.mock" level="ERROR"/>
-
-    <logger name="StandardProcessSession.claims" level="INFO" />
+    <logger name="org.apache.nifi.remote.client" level="DEBUG"/>
+    <logger name="org.apache.nifi.remote.client.PeerSelectorTest" level="DEBUG"/>
 
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 55e5109..9ca4dad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -16,6 +16,37 @@
  */
 package org.apache.nifi.remote;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -50,38 +81,6 @@ import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 /**
  * Represents the Root Process Group of a remote NiFi Instance. Holds
  * information about that remote instance, as well as Incoming Ports and
@@ -919,7 +918,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                 final List<String> inputPortString = dto.getInputPorts().stream()
                     .map(port -> "InputPort[name=" + port.getName() + ", targetId=" + port.getId() + "]")
                     .collect(Collectors.toList());
-                final List<String> outputPortString = dto.getInputPorts().stream()
+                final List<String> outputPortString = dto.getOutputPorts().stream()
                     .map(port -> "OutputPort[name=" + port.getName() + ", targetId=" + port.getId() + "]")
                     .collect(Collectors.toList());
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
index 99d1743..a1ee86b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
@@ -30,7 +30,7 @@
 
 
     <logger name="org.apache.nifi" level="INFO"/>
-    <logger name="org.apache.nifi.controller.tasks" level="DEBUG" />"
+    <logger name="org.apache.nifi.controller.tasks" level="DEBUG"/>
     <logger name="org.apache.nifi.controller.service" level="DEBUG"/>
     <logger name="org.apache.nifi.encrypt" level="DEBUG"/>
     <logger name="org.apache.nifi.security.util.crypto" level="DEBUG"/>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 6737936..7be1a2a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -17,11 +17,29 @@
 package org.apache.nifi.web.api;
 
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -46,26 +64,6 @@ import org.apache.nifi.web.api.entity.PeersEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-
 /**
  * RESTful endpoint for managing a SiteToSite connection.
  */
@@ -131,10 +129,6 @@ public class SiteToSiteResource extends ApplicationResource {
 
         authorizeSiteToSite();
 
-        if (isReplicateRequest()) {
-            return replicate(HttpMethod.GET);
-        }
-
         // get the controller dto
         final ControllerDTO controller = serviceFacade.getSiteToSiteDetails();