You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/05/29 15:02:01 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #4289: NIFI-7467 Refactored S2S peer selection logic

markap14 commented on a change in pull request #4289:
URL: https://github.com/apache/nifi/pull/4289#discussion_r432476147



##########
File path: nifi-commons/nifi-site-to-site-client/pom.xml
##########
@@ -76,6 +76,15 @@
             <artifactId>httpasyncclient</artifactId>
             <version>4.1.4</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>

Review comment:
       Is there a need to add this as provided? I would think that all code should be dependent on the API jar only. Or if needed for unit tests, should use a scope of test. I don't think there's any actual dependency on the implementation of the logging api?

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
##########
@@ -110,20 +110,41 @@ public PeerDescription getBootstrapPeerDescription() throws IOException {
             apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
             apiClient.setLocalAddress(config.getLocalAddress());
 
-            final Collection<PeerDTO> peers = apiClient.getPeers();
-            if(peers == null || peers.size() == 0){
-                throw new IOException("Couldn't get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
-            }
+           return fetchRemotePeerStatuses(peerDescription, apiClient);
+        }
+    }
 
-            // Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
-            // was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
-            return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
-                    .collect(Collectors.toSet());
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription, SiteToSiteCommunicator communicator) throws IOException {
+        if (!(communicator instanceof SiteToSiteRestApiClient)) {
+            throw new IllegalArgumentException("The communicator must be a SiteToSiteRestApiClient when using HTTP communication");
+        }
+        SiteToSiteRestApiClient apiClient = (SiteToSiteRestApiClient) communicator;
+
+        // Each node should have the same URL structure and network reachability with the proxy configuration
+        // final String scheme = peerDescription.isSecure() ? "https" : "http";
+        // apiClient.setBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());

Review comment:
       Is this code intended to be commented out?

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
##########
@@ -16,353 +16,559 @@
  */
 package org.apache.nifi.remote.client;
 
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import javax.validation.constraints.NotNull;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
+/**
+ * Service which maintains state around peer (NiFi node(s) in a remote instance (cluster or
+ * standalone)). There is an internal cache which stores identifying information about each
+ * node and the current workload of each in number of flowfiles being processed. Individual
+ * nodes can be penalized for an amount of time (see {@link #penalize(Peer, long)}) to avoid
+ * sending/receiving data from them. Attempts are made to balance communications ("busier"
+ * nodes will {@code TransferDirection.SEND} more and {@code TransferDirection.RECEIVE} fewer
+ * flowfiles from this instance).
+ */
 public class PeerSelector {
-
     private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
-    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private static final long PEER_REFRESH_PERIOD = 60000L;
+    // The timeout for the peer status cache
+    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
-    private volatile long peerRefreshTime = 0L;
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    private volatile PeerStatusCache peerStatusCache;
+    // The service which saves the peer state to persistent storage
     private final PeerPersistence peerPersistence;
 
-    private EventReporter eventReporter;
-
+    // The service which retrieves peer state
     private final PeerStatusProvider peerStatusProvider;
-    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
 
-    static class SystemTime {
-        long currentTimeMillis() {
-            return System.currentTimeMillis();
-        }
-    }
-    private SystemTime systemTime = new SystemTime();
+    // Maps the peer description to a millisecond penalty expiration
+    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<>();
+
+    // The most recently fetched peer statuses
+    private volatile PeerStatusCache peerStatusCache;
+
+    private EventReporter eventReporter;
 
     /**
-     * Replace the SystemTime instance.
-     * This method is purely used by unit testing, to emulate peer refresh period.
+     * Returns a peer selector with the provided collaborators.
+     *
+     * @param peerStatusProvider the service which retrieves peer state
+     * @param peerPersistence    the service which persists peer state
      */
-    void setSystemTime(final SystemTime systemTime) {
-        logger.info("Replacing systemTime instance to {}.", systemTime);
-        this.systemTime = systemTime;
-    }
-
     public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
         this.peerStatusProvider = peerStatusProvider;
         this.peerPersistence = peerPersistence;
 
+        // On instantiation, retrieve the peer status cache
+        restoreInitialPeerStatusCache();
+    }
+
+    /**
+     * Populates the peer status cache from the peer persistence provider (e.g. the file system or
+     * persisted cluster state). If this fails, it will log a warning and continue, as it is not
+     * required for startup. If the cached protocol differs from the currently configured protocol,
+     * the cache will be cleared.
+     */
+    private void restoreInitialPeerStatusCache() {
         try {
             PeerStatusCache restoredPeerStatusCache = null;
             if (peerPersistence != null) {
                 restoredPeerStatusCache = peerPersistence.restore();
+
+                // If there is an existing cache, ensure that the protocol matches the current protocol
                 if (restoredPeerStatusCache != null) {
                     final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
                     final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+
+                    // If the protocols have changed, clear the cache
                     if (!currentProtocol.equals(cachedProtocol)) {
-                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
-                            peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+                        logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+                                peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
                         restoredPeerStatusCache = null;
                     }
                 }
             }
             this.peerStatusCache = restoredPeerStatusCache;
-
         } catch (final IOException ioe) {
             logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
-                peerPersistence.getClass().getSimpleName(), ioe);
+                    peerPersistence.getClass().getSimpleName(), ioe);
         }
     }
 
-    private void persistPeerStatuses() {
-        try {
-            peerPersistence.save(peerStatusCache);
-        } catch (final IOException e) {
-            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
-                " and the nodes specified at the RPG are down," +
-                " may be unable to transfer data until communications with those nodes are restored", e.toString());
-            logger.error("", e);
+    /**
+     * Returns the normalized weight for this ratio of peer flowfiles to total flowfiles and the given direction. The number will be
+     * a Double between 0 and 100 indicating the percent of all flowfiles the peer
+     * should send/receive. The transfer direction is <em>from the perspective of this node to the peer</em>
+     * (i.e. how many flowfiles should <em>this node send</em> to the peer, or how many flowfiles
+     * should <em>this node receive</em> from the peer).
+     *
+     * @param direction          the transfer direction ({@code SEND} weights the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights them higher if they have more)
+     * @param totalFlowFileCount the total flowfile count in the remote instance (standalone or cluster)
+     * @param flowFileCount      the flowfile count for the given peer
+     * @param peerCount          the number of peers in the remote instance
+     * @return the normalized weight of this peer
+     */
+    private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
+        // If there is only a single remote, send/receive all data to/from it
+        if (peerCount == 1) {
+            return 100;
         }
-    }
-
-    List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
-
-        final int numDestinations = Math.max(128, statuses.size());
-        final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
 
-        long totalFlowFileCount = 0L;
-        for (final PeerStatus nodeInfo : statuses) {
-            totalFlowFileCount += nodeInfo.getFlowFileCount();
+        double cappedPercent;
+        // If no flowfiles exist in the remote instance, evenly weight each node with 1/N
+        if (totalFlowFileCount == 0) {
+            cappedPercent = 1.0 / peerCount;
+        } else {
+            final double percentageOfFlowFiles = ((double) flowFileCount / totalFlowFileCount);
+            cappedPercent = percentageOfFlowFiles;
+
+            // If sending to the remote, allocate more flowfiles to the less-stressed peers
+            if (direction == TransferDirection.SEND) {
+                cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
+            }
         }
+        return new BigDecimal(cappedPercent * 100).setScale(2, RoundingMode.FLOOR).doubleValue();
+    }
 
-        int totalEntries = 0;
-        for (final PeerStatus nodeInfo : statuses) {
-            final int flowFileCount = nodeInfo.getFlowFileCount();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
+    /**
+     * Returns an ordered map of peers sorted in descending order by value (relative weight).
+     *
+     * @param unsortedMap the unordered map of peers to weights
+     * @return the sorted (desc) map (by value)
+     */
+    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
+        List<Map.Entry<PeerStatus, Double>> list = new ArrayList<>(unsortedMap.entrySet());
+        list.sort(Map.Entry.comparingByValue());
+
+        LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            Map.Entry<PeerStatus, Double> entry = list.get(i);
+            result.put(entry.getKey(), entry.getValue());
         }
 
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final PeerStatus nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-
-            int skipIndex = numEntries;
-            for (int i = 0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if (status == null) {
-                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
+        return result;
+    }
+
+    /**
+     * Prints the distribution of the peers to the logger.
+     *
+     * @param sortedPeerWorkloads the peers and relative weights
+     */
+    private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
+        if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
+            DecimalFormat df = new DecimalFormat("##.##");
+            df.setRoundingMode(RoundingMode.FLOOR);
+            final StringBuilder distributionDescription = new StringBuilder();
+            distributionDescription.append("New weighted distribution of nodes:");
+            for (final Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
+                final double percentage = entry.getValue();
+                distributionDescription.append("\n").append(entry.getKey())
+                        .append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ")
+                        .append(df.format(percentage)).append("% of data");
             }
+            logger.debug(distributionDescription.toString());
         }
+    }
 
-        // Shuffle destinations to provide better distribution.
-        // Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
-        // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
-        Collections.shuffle(destinations, new Random(0));
+    /**
+     * Returns the total of all values in the map. This method is frequently used to calculate the total number of
+     * flowfiles in the instance from the respective peer flowfile counts or the total percentage from the relative weights.
+     *
+     * @param peerWeightMap the map of peers to flowfile counts or relative weights
+     * @return the total of the map values
+     */
+    private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
+        return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+    }
 
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
-        }
-        logger.info(distributionDescription.toString());
+    /**
+     * Resets all penalization states for the peers.
+     */
+    public void clear() {
+        peerPenaltyExpirations.clear();
+    }
+
+    /**
+     * Return status of a peer that will be used for the next communication.
+     * The peers with lower workloads will be selected with higher probability.
+     *
+     * @param direction the amount of workload is calculated based on transaction direction,
+     *                  for SEND, a peer with fewer flow files is preferred,
+     *                  for RECEIVE, a peer with more flow files is preferred
+     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     */
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+        Set<PeerStatus> peerStatuses = getPeerStatuses();
+        Map<PeerStatus, Double> orderedPeerStatuses = buildWeightedPeerMap(peerStatuses, direction);
+
+        return getAvailablePeerStatus(orderedPeerStatuses);
+    }
 
-        // Jumble the list of destinations.
-        return destinations;
+    /**
+     * Returns {@code true} if this peer is currently penalized and should not send/receive flowfiles.
+     *
+     * @param peerStatus the peer status identifying the peer
+     * @return true if this peer is penalized
+     */
+    public boolean isPenalized(final PeerStatus peerStatus) {
+        final Long expirationEnd = peerPenaltyExpirations.get(peerStatus.getPeerDescription());
+        return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
     }
 
     /**
      * Updates internal state map to penalize a PeerStatus that points to the
-     * specified peer
+     * specified peer.
      *
-     * @param peer the peer
-     * @param penalizationMillis period of time to penalize a given peer
+     * @param peer               the peer
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
         penalize(peer.getDescription(), penalizationMillis);
     }
 
+    /**
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer.
+     *
+     * @param peerDescription    the peer description (identifies the peer)
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
+     */
     public void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(peerDescription);
+        Long expiration = peerPenaltyExpirations.get(peerDescription);
         if (expiration == null) {
-            expiration = Long.valueOf(0L);
+            expiration = 0L;
         }
 
-        final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
+        final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+        peerPenaltyExpirations.put(peerDescription, newExpiration);
     }
 
-    public boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
+    /**
+     * Allows for external callers to trigger a refresh of the internal peer status cache. Performs the refresh if the cache has expired. If the cache is still valid, skips the refresh.
+     */
+    public void refresh() {
+        long cacheAgeMs = getCacheAge();
+        logger.debug("External refresh triggered. Last refresh was {} ms ago", cacheAgeMs);
+        if (isPeerRefreshNeeded()) {
+            logger.debug("Refreshing peer status cache");
+            refreshPeerStatusCache();
+        } else {
+            logger.debug("Cache is still valid; skipping refresh");
+        }
     }
 
-    public void clear() {
-        peerTimeoutExpirations.clear();
+    /**
+     * Sets the event reporter instance.
+     *
+     * @param eventReporter the event reporter
+     */
+    public void setEventReporter(EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
     }
 
-    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    /**
+     * Returns a map of peers prepared for flowfile transfer in the specified direction. Each peer is a key and the value is a
+     * weighted percentage of the total flowfiles in the remote instance. For example, in a cluster where the total number of flowfiles
+     * is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C, the resulting map for
+     * {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 * 100 / (3-1)) => 40.0).
+     *
+     * @param statuses  the set of all peers
+     * @param direction the direction of transfer ({@code SEND} weights the destinations higher if they have more flowfiles, {@code RECEIVE} weights them higher if they have fewer)
+     * @return the ordered map of each peer to its relative weight
+     */
+    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final Set<PeerStatus> statuses, final TransferDirection direction) {
+        // Get all the destinations with their relative weights
+        final Map<PeerStatus, Double> peerWorkloads = createDestinationMap(statuses, direction);
+
+        if (!peerWorkloads.isEmpty()) {
+            // This map is sorted, but not by key, so it cannot use SortedMap
+            LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = sortMapByWeight(peerWorkloads);
+
+            // Print the expected distribution of the peers
+            printDistributionStatistics(sortedPeerWorkloads, direction);
+
+            return sortedPeerWorkloads;
+        } else {
+            logger.debug("No peers available");
+            return new LinkedHashMap<>();
+        }
     }
 
     /**
-     * Return status of a peer that will be used for the next communication.
-     * The peer with less workload will be selected with higher probability.
-     * @param direction the amount of workload is calculated based on transaction direction,
-     *                  for SEND, a peer with less flow files is preferred,
-     *                  for RECEIVE, a peer with more flow files is preferred
-     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     * Returns a map indexed by a peer to the normalized weight (number of flowfiles currently being
+     * processed by the peer as a percentage of the total). This is used to allocate flowfiles to
+     * the various peers as destinations.
+     *
+     * @param peerStatuses the set of peers, along with their current workload (number of flowfiles)
+     * @param direction    whether sending flowfiles to these peers or receiving them
+     * @return the map of weighted peers
      */
-    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if (isPeerRefreshNeeded(peerList)) {
-            peerRefreshLock.lock();
+    @NotNull
+    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
+        final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
+
+        // Calculate the total number of flowfiles in the peers
+        long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
+        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", totalFlowFileCount);
+
+        // For each node, calculate the relative weight and store it in the map
+        for (final PeerStatus nodeInfo : peerStatuses) {
+            final int flowFileCount = nodeInfo.getFlowFileCount();
+            final double normalizedWeight = calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
+            peerWorkloads.put(nodeInfo, normalizedWeight);
+        }
+
+        return peerWorkloads;
+    }
+
+    /**
+     * Returns a set of {@link PeerStatus} objects representing all remote peers for the provided
+     * {@link PeerDescription}s. If a queried peer returns updated state on a peer which has already
+     * been captured, the new state is used.
+     * <p>
+     * Example:
+     * <p>
+     * 3 node cluster with nodes A, B, C
+     * <p>
+     * Node A knows about Node B and Node C, B about A and C, etc.
+     *
+     * <pre>
+     *     Action                           |   Statuses
+     *     query(A) -> B.status, C.status   |   Bs1, Cs1
+     *     query(B) -> A.status, C.status   |   As1, Bs1, Cs2
+     *     query(C) -> A.status, B.status   |   As2, Bs2, Cs2
+     * </pre>
+     *
+     * @param peersToRequestClusterInfoFrom the set of peers to query
+     * @return the complete set of statuses for each collection of peers
+     * @throws IOException if there is a problem fetching peer statuses
+     */
+    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
+        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
+        Exception lastFailure = null;
+
+        final Set<PeerStatus> allPeerStatuses = new HashSet<>();
+
+        // Iterate through all peers, getting (sometimes multiple) status(es) from each
+        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
             try {
-                // now that we have the lock, check again that we need to refresh (because another thread
-                // could have been refreshing while we were waiting for the lock).
-                peerList = peerStatuses;
-                if (isPeerRefreshNeeded(peerList)) {
-                    try {
-                        peerList = createPeerStatusList(direction);
-                    } catch (final Exception e) {
-                        final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                        warn(logger, eventReporter, message);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-                    }
+                // Retrieve the peer status(es) from each peer description
+                final Set<PeerStatus> statusesForPeerDescription = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
 
-                    this.peerStatuses = peerList;
-                    peerRefreshTime = systemTime.currentTimeMillis();
-                }
-            } finally {
-                peerRefreshLock.unlock();
+                // Filter to remove any peers which are not queryable
+                final Set<PeerStatus> filteredStatuses = statusesForPeerDescription.stream()
+                        .filter(PeerStatus::isQueryForPeers)
+                        .collect(Collectors.toSet());
+
+                allPeerStatuses.addAll(filteredStatuses);
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}",
+                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
+                lastFailure = e;
             }
         }
 
-        if (peerList == null || peerList.isEmpty()) {
+        // If no peers were fetched and an exception was the cause, throw an exception
+        if (allPeerStatuses.isEmpty() && lastFailure != null) {
+            throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
+        }
+
+        return allPeerStatuses;
+    }
+
+    /**
+     * Returns the {@link PeerStatus} identifying the next peer to send/receive data. This uses random
+     * selection of peers, weighted by the relative desirability (i.e. for {@code SEND}, peers with more
+     * flowfiles are more likely to be selected, and for {@code RECEIVE}, peers with fewer flowfiles are
+     * more likely).
+     *
+     * @param orderedPeerStatuses the map of peers to relative weights, sorted in descending order by weight
+     * @return the peer to send/receive data
+     */
+    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
+        if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
+            logger.warn("Available peers collection is empty; no peer available");
             return null;
         }
 
-        PeerStatus peerStatus;
-        for (int i = 0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
+        final double totalWeights = sumMapValues(orderedPeerStatuses);
+        logger.debug("Determining next available peer ({} peers with total weight {})", orderedPeerStatuses.keySet().size(), totalWeights);
+
+        final double random = Math.random() * 100;
+        logger.debug("Generated random value {}", random);
+        if (random > totalWeights) {

Review comment:
       If random can fall outside of the total weights, does it make sense to instead use:
   ```
   final double random = Math.random() * Math.min(100, totalWeights);
   ```
   Then we know that it will fall in the range of [0, totalWeights)

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
##########
@@ -52,12 +51,25 @@
      * Fetch peer statuses from a remote NiFi cluster.
      * Implementation of this method should fetch peer statuses from the node
      * represented by the passed PeerDescription using its transport protocol.
+     *
      * @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully
      * @return Remote peer statuses
      * @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer
      */
     Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException;
 
+    /**
+     * Fetch peer statuses from a remote NiFi cluster.
+     * Implementation of this method should fetch peer statuses from the node
+     * represented by the passed PeerDescription using its transport protocol.
+     *
+     * @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully
+     * @param communicator the communication mechanism provider can be provided directly
+     * @return Remote peer statuses
+     * @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer
+     */
+    Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription, SiteToSiteCommunicator communicator) throws IOException;

Review comment:
       I'm always leery about Marker interfaces. I wonder if it would make sense to genericize this class, to define it as  `PeerStatusProvider<T>` and then define this method as `Set<PeerStatus> fetchRemotePeerStatus(PeerDescription peerDescription, T communicator) throws IOException;`
   
   Using a marker interface means that I can pass in `new SiteToSiteCommunicator() {}` here, and I am abiding by the contract laid out in the interface, but it's still going to fail - and do so at runtime, rather than compile-time.

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
##########
@@ -16,353 +16,559 @@
  */
 package org.apache.nifi.remote.client;
 
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import javax.validation.constraints.NotNull;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
+/**
+ * Service which maintains state around peer (NiFi node(s) in a remote instance (cluster or
+ * standalone)). There is an internal cache which stores identifying information about each
+ * node and the current workload of each in number of flowfiles being processed. Individual
+ * nodes can be penalized for an amount of time (see {@link #penalize(Peer, long)}) to avoid
+ * sending/receiving data from them. Attempts are made to balance communications ("busier"
+ * nodes will {@code TransferDirection.SEND} more and {@code TransferDirection.RECEIVE} fewer
+ * flowfiles from this instance).
+ */
 public class PeerSelector {
-
     private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
-    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private static final long PEER_REFRESH_PERIOD = 60000L;
+    // The timeout for the peer status cache
+    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
-    private volatile long peerRefreshTime = 0L;
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    private volatile PeerStatusCache peerStatusCache;
+    // The service which saves the peer state to persistent storage
     private final PeerPersistence peerPersistence;
 
-    private EventReporter eventReporter;
-
+    // The service which retrieves peer state
     private final PeerStatusProvider peerStatusProvider;
-    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
 
-    static class SystemTime {
-        long currentTimeMillis() {
-            return System.currentTimeMillis();
-        }
-    }
-    private SystemTime systemTime = new SystemTime();
+    // Maps the peer description to a millisecond penalty expiration
+    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<>();
+
+    // The most recently fetched peer statuses
+    private volatile PeerStatusCache peerStatusCache;
+
+    private EventReporter eventReporter;
 
     /**
-     * Replace the SystemTime instance.
-     * This method is purely used by unit testing, to emulate peer refresh period.
+     * Returns a peer selector with the provided collaborators.
+     *
+     * @param peerStatusProvider the service which retrieves peer state
+     * @param peerPersistence    the service which persists peer state
      */
-    void setSystemTime(final SystemTime systemTime) {
-        logger.info("Replacing systemTime instance to {}.", systemTime);
-        this.systemTime = systemTime;
-    }
-
     public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
         this.peerStatusProvider = peerStatusProvider;
         this.peerPersistence = peerPersistence;
 
+        // On instantiation, retrieve the peer status cache
+        restoreInitialPeerStatusCache();
+    }
+
+    /**
+     * Populates the peer status cache from the peer persistence provider (e.g. the file system or
+     * persisted cluster state). If this fails, it will log a warning and continue, as it is not
+     * required for startup. If the cached protocol differs from the currently configured protocol,
+     * the cache will be cleared.
+     */
+    private void restoreInitialPeerStatusCache() {
         try {
             PeerStatusCache restoredPeerStatusCache = null;
             if (peerPersistence != null) {
                 restoredPeerStatusCache = peerPersistence.restore();
+
+                // If there is an existing cache, ensure that the protocol matches the current protocol
                 if (restoredPeerStatusCache != null) {
                     final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
                     final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+
+                    // If the protocols have changed, clear the cache
                     if (!currentProtocol.equals(cachedProtocol)) {
-                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
-                            peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+                        logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+                                peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
                         restoredPeerStatusCache = null;
                     }
                 }
             }
             this.peerStatusCache = restoredPeerStatusCache;
-
         } catch (final IOException ioe) {
             logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
-                peerPersistence.getClass().getSimpleName(), ioe);
+                    peerPersistence.getClass().getSimpleName(), ioe);
         }
     }
 
-    private void persistPeerStatuses() {
-        try {
-            peerPersistence.save(peerStatusCache);
-        } catch (final IOException e) {
-            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
-                " and the nodes specified at the RPG are down," +
-                " may be unable to transfer data until communications with those nodes are restored", e.toString());
-            logger.error("", e);
+    /**
+     * Returns the normalized weight for this ratio of peer flowfiles to total flowfiles and the given direction. The number will be
+     * a Double between 0 and 100 indicating the percent of all flowfiles the peer
+     * should send/receive. The transfer direction is <em>from the perspective of this node to the peer</em>
+     * (i.e. how many flowfiles should <em>this node send</em> to the peer, or how many flowfiles
+     * should <em>this node receive</em> from the peer).
+     *
+     * @param direction          the transfer direction ({@code SEND} weights the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights them higher if they have more)
+     * @param totalFlowFileCount the total flowfile count in the remote instance (standalone or cluster)
+     * @param flowFileCount      the flowfile count for the given peer
+     * @param peerCount          the number of peers in the remote instance
+     * @return the normalized weight of this peer
+     */
+    private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
+        // If there is only a single remote, send/receive all data to/from it
+        if (peerCount == 1) {
+            return 100;
         }
-    }
-
-    List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
-
-        final int numDestinations = Math.max(128, statuses.size());
-        final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
 
-        long totalFlowFileCount = 0L;
-        for (final PeerStatus nodeInfo : statuses) {
-            totalFlowFileCount += nodeInfo.getFlowFileCount();
+        double cappedPercent;
+        // If no flowfiles exist in the remote instance, evenly weight each node with 1/N
+        if (totalFlowFileCount == 0) {
+            cappedPercent = 1.0 / peerCount;
+        } else {
+            final double percentageOfFlowFiles = ((double) flowFileCount / totalFlowFileCount);
+            cappedPercent = percentageOfFlowFiles;
+
+            // If sending to the remote, allocate more flowfiles to the less-stressed peers
+            if (direction == TransferDirection.SEND) {
+                cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
+            }
         }
+        return new BigDecimal(cappedPercent * 100).setScale(2, RoundingMode.FLOOR).doubleValue();
+    }
 
-        int totalEntries = 0;
-        for (final PeerStatus nodeInfo : statuses) {
-            final int flowFileCount = nodeInfo.getFlowFileCount();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
+    /**
+     * Returns an ordered map of peers sorted in descending order by value (relative weight).
+     *
+     * @param unsortedMap the unordered map of peers to weights
+     * @return the sorted (desc) map (by value)
+     */
+    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
+        List<Map.Entry<PeerStatus, Double>> list = new ArrayList<>(unsortedMap.entrySet());
+        list.sort(Map.Entry.comparingByValue());
+
+        LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            Map.Entry<PeerStatus, Double> entry = list.get(i);
+            result.put(entry.getKey(), entry.getValue());
         }
 
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final PeerStatus nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-
-            int skipIndex = numEntries;
-            for (int i = 0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if (status == null) {
-                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
+        return result;
+    }
+
+    /**
+     * Prints the distribution of the peers to the logger.
+     *
+     * @param sortedPeerWorkloads the peers and relative weights
+     */
+    private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
+        if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
+            DecimalFormat df = new DecimalFormat("##.##");
+            df.setRoundingMode(RoundingMode.FLOOR);
+            final StringBuilder distributionDescription = new StringBuilder();
+            distributionDescription.append("New weighted distribution of nodes:");
+            for (final Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
+                final double percentage = entry.getValue();
+                distributionDescription.append("\n").append(entry.getKey())
+                        .append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ")
+                        .append(df.format(percentage)).append("% of data");
             }
+            logger.debug(distributionDescription.toString());
         }
+    }
 
-        // Shuffle destinations to provide better distribution.
-        // Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
-        // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
-        Collections.shuffle(destinations, new Random(0));
+    /**
+     * Returns the total of all values in the map. This method is frequently used to calculate the total number of
+     * flowfiles in the instance from the respective peer flowfile counts or the total percentage from the relative weights.
+     *
+     * @param peerWeightMap the map of peers to flowfile counts or relative weights
+     * @return the total of the map values
+     */
+    private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
+        return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+    }
 
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
-        }
-        logger.info(distributionDescription.toString());
+    /**
+     * Resets all penalization states for the peers.
+     */
+    public void clear() {
+        peerPenaltyExpirations.clear();
+    }
+
+    /**
+     * Return status of a peer that will be used for the next communication.
+     * The peers with lower workloads will be selected with higher probability.
+     *
+     * @param direction the amount of workload is calculated based on transaction direction,
+     *                  for SEND, a peer with fewer flow files is preferred,
+     *                  for RECEIVE, a peer with more flow files is preferred
+     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     */
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+        Set<PeerStatus> peerStatuses = getPeerStatuses();
+        Map<PeerStatus, Double> orderedPeerStatuses = buildWeightedPeerMap(peerStatuses, direction);
+
+        return getAvailablePeerStatus(orderedPeerStatuses);
+    }
 
-        // Jumble the list of destinations.
-        return destinations;
+    /**
+     * Returns {@code true} if this peer is currently penalized and should not send/receive flowfiles.
+     *
+     * @param peerStatus the peer status identifying the peer
+     * @return true if this peer is penalized
+     */
+    public boolean isPenalized(final PeerStatus peerStatus) {
+        final Long expirationEnd = peerPenaltyExpirations.get(peerStatus.getPeerDescription());
+        return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
     }
 
     /**
      * Updates internal state map to penalize a PeerStatus that points to the
-     * specified peer
+     * specified peer.
      *
-     * @param peer the peer
-     * @param penalizationMillis period of time to penalize a given peer
+     * @param peer               the peer
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
         penalize(peer.getDescription(), penalizationMillis);
     }
 
+    /**
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer.
+     *
+     * @param peerDescription    the peer description (identifies the peer)
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
+     */
     public void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(peerDescription);
+        Long expiration = peerPenaltyExpirations.get(peerDescription);
         if (expiration == null) {
-            expiration = Long.valueOf(0L);
+            expiration = 0L;
         }
 
-        final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
+        final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+        peerPenaltyExpirations.put(peerDescription, newExpiration);
     }
 
-    public boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
+    /**
+     * Allows for external callers to trigger a refresh of the internal peer status cache. Performs the refresh if the cache has expired. If the cache is still valid, skips the refresh.
+     */
+    public void refresh() {
+        long cacheAgeMs = getCacheAge();
+        logger.debug("External refresh triggered. Last refresh was {} ms ago", cacheAgeMs);
+        if (isPeerRefreshNeeded()) {
+            logger.debug("Refreshing peer status cache");
+            refreshPeerStatusCache();
+        } else {
+            logger.debug("Cache is still valid; skipping refresh");
+        }
     }
 
-    public void clear() {
-        peerTimeoutExpirations.clear();
+    /**
+     * Sets the event reporter instance.
+     *
+     * @param eventReporter the event reporter
+     */
+    public void setEventReporter(EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
     }
 
-    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    /**
+     * Returns a map of peers prepared for flowfile transfer in the specified direction. Each peer is a key and the value is a
+     * weighted percentage of the total flowfiles in the remote instance. For example, in a cluster where the total number of flowfiles
+     * is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C, the resulting map for
+     * {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 * 100 / (3-1)) => 40.0).
+     *
+     * @param statuses  the set of all peers
+     * @param direction the direction of transfer ({@code SEND} weights the destinations higher if they have more flowfiles, {@code RECEIVE} weights them higher if they have fewer)
+     * @return the ordered map of each peer to its relative weight
+     */
+    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final Set<PeerStatus> statuses, final TransferDirection direction) {
+        // Get all the destinations with their relative weights
+        final Map<PeerStatus, Double> peerWorkloads = createDestinationMap(statuses, direction);
+
+        if (!peerWorkloads.isEmpty()) {
+            // This map is sorted, but not by key, so it cannot use SortedMap
+            LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = sortMapByWeight(peerWorkloads);
+
+            // Print the expected distribution of the peers
+            printDistributionStatistics(sortedPeerWorkloads, direction);
+
+            return sortedPeerWorkloads;
+        } else {
+            logger.debug("No peers available");
+            return new LinkedHashMap<>();
+        }
     }
 
     /**
-     * Return status of a peer that will be used for the next communication.
-     * The peer with less workload will be selected with higher probability.
-     * @param direction the amount of workload is calculated based on transaction direction,
-     *                  for SEND, a peer with less flow files is preferred,
-     *                  for RECEIVE, a peer with more flow files is preferred
-     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     * Returns a map indexed by a peer to the normalized weight (number of flowfiles currently being
+     * processed by the peer as a percentage of the total). This is used to allocate flowfiles to
+     * the various peers as destinations.
+     *
+     * @param peerStatuses the set of peers, along with their current workload (number of flowfiles)
+     * @param direction    whether sending flowfiles to these peers or receiving them
+     * @return the map of weighted peers
      */
-    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if (isPeerRefreshNeeded(peerList)) {
-            peerRefreshLock.lock();
+    @NotNull
+    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
+        final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
+
+        // Calculate the total number of flowfiles in the peers
+        long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
+        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", totalFlowFileCount);
+
+        // For each node, calculate the relative weight and store it in the map
+        for (final PeerStatus nodeInfo : peerStatuses) {
+            final int flowFileCount = nodeInfo.getFlowFileCount();
+            final double normalizedWeight = calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
+            peerWorkloads.put(nodeInfo, normalizedWeight);
+        }
+
+        return peerWorkloads;
+    }
+
+    /**
+     * Returns a set of {@link PeerStatus} objects representing all remote peers for the provided
+     * {@link PeerDescription}s. If a queried peer returns updated state on a peer which has already
+     * been captured, the new state is used.
+     * <p>
+     * Example:
+     * <p>
+     * 3 node cluster with nodes A, B, C
+     * <p>
+     * Node A knows about Node B and Node C, B about A and C, etc.
+     *
+     * <pre>
+     *     Action                           |   Statuses
+     *     query(A) -> B.status, C.status   |   Bs1, Cs1
+     *     query(B) -> A.status, C.status   |   As1, Bs1, Cs2
+     *     query(C) -> A.status, B.status   |   As2, Bs2, Cs2
+     * </pre>
+     *
+     * @param peersToRequestClusterInfoFrom the set of peers to query
+     * @return the complete set of statuses for each collection of peers
+     * @throws IOException if there is a problem fetching peer statuses
+     */
+    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
+        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
+        Exception lastFailure = null;
+
+        final Set<PeerStatus> allPeerStatuses = new HashSet<>();
+
+        // Iterate through all peers, getting (sometimes multiple) status(es) from each
+        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
             try {
-                // now that we have the lock, check again that we need to refresh (because another thread
-                // could have been refreshing while we were waiting for the lock).
-                peerList = peerStatuses;
-                if (isPeerRefreshNeeded(peerList)) {
-                    try {
-                        peerList = createPeerStatusList(direction);
-                    } catch (final Exception e) {
-                        final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                        warn(logger, eventReporter, message);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-                    }
+                // Retrieve the peer status(es) from each peer description
+                final Set<PeerStatus> statusesForPeerDescription = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
 
-                    this.peerStatuses = peerList;
-                    peerRefreshTime = systemTime.currentTimeMillis();
-                }
-            } finally {
-                peerRefreshLock.unlock();
+                // Filter to remove any peers which are not queryable
+                final Set<PeerStatus> filteredStatuses = statusesForPeerDescription.stream()
+                        .filter(PeerStatus::isQueryForPeers)
+                        .collect(Collectors.toSet());
+
+                allPeerStatuses.addAll(filteredStatuses);
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}",
+                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
+                lastFailure = e;
             }
         }
 
-        if (peerList == null || peerList.isEmpty()) {
+        // If no peers were fetched and an exception was the cause, throw an exception
+        if (allPeerStatuses.isEmpty() && lastFailure != null) {
+            throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
+        }
+
+        return allPeerStatuses;
+    }
+
+    /**
+     * Returns the {@link PeerStatus} identifying the next peer to send/receive data. This uses random
+     * selection of peers, weighted by the relative desirability (i.e. for {@code SEND}, peers with more
+     * flowfiles are more likely to be selected, and for {@code RECEIVE}, peers with fewer flowfiles are
+     * more likely).
+     *
+     * @param orderedPeerStatuses the map of peers to relative weights, sorted in descending order by weight
+     * @return the peer to send/receive data
+     */
+    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
+        if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
+            logger.warn("Available peers collection is empty; no peer available");
             return null;
         }
 
-        PeerStatus peerStatus;
-        for (int i = 0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
+        final double totalWeights = sumMapValues(orderedPeerStatuses);
+        logger.debug("Determining next available peer ({} peers with total weight {})", orderedPeerStatuses.keySet().size(), totalWeights);
+
+        final double random = Math.random() * 100;
+        logger.debug("Generated random value {}", random);
+        if (random > totalWeights) {
+            logger.warn("Random selection was outside of the precision of the weights ({}, {}); allocating to the first available peer", random, totalWeights);
+            return new ArrayList<>(orderedPeerStatuses.keySet()).get(0);
+        }
 
-            if (isPenalized(peerStatus)) {
-                logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
-            } else {
-                return peerStatus;
+        double threshold = 0.0;
+        for (Map.Entry<PeerStatus, Double> e : orderedPeerStatuses.entrySet()) {
+            logger.debug("Initial threshold was {}; added peer value {}; total {}", threshold, e.getValue(), threshold + e.getValue());
+            threshold += e.getValue();
+            if (random <= threshold) {
+                PeerStatus peerStatus = e.getKey();
+                if (isPenalized(peerStatus)) {
+                    logger.debug("{} is penalized; will not communicate with this peer", peerStatus);
+                } else {
+                    return peerStatus;
+                }
             }
         }
 
-        logger.debug("{} All peers appear to be penalized; returning null", this);
+        logger.debug("Did not select a peer; r {}, t {}, w {}", random, threshold, orderedPeerStatuses.values());

Review comment:
       I think the logic here is a bit off. Consider the case that there are 10 peers - node0 .. node9. And consider that only node9 is penalized. If node9 happens to be the node that is randomly selected, then it is the only node that `isPenalized(peerStatus)` will be evaluated for. And it may be the only node that is penalized. But the logic here assumes that all nodes are penalized - which would introduce an unnecessary delay in data transfer.
   
   It might make sense to begin the method by first creating a copy of the Map and then filtering out any elements from the Map that are penalized. Or, alternatively, perhaps in the loop above, keep track of all peers that have not been penalized and then randomly choose one from that list if reaching this point - or just loop the entire logic of the method until one is chosen. But I feel like removing the penalized peers from the map to start with is least "risky" performance wise because if you end up with a loop where only 1 node is not penalized and it only has a 0.5% chance of receiving data, for instance, such a loop could run a long time.

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
##########
@@ -16,353 +16,559 @@
  */
 package org.apache.nifi.remote.client;
 
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import javax.validation.constraints.NotNull;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
+/**
+ * Service which maintains state around peer (NiFi node(s) in a remote instance (cluster or
+ * standalone)). There is an internal cache which stores identifying information about each
+ * node and the current workload of each in number of flowfiles being processed. Individual
+ * nodes can be penalized for an amount of time (see {@link #penalize(Peer, long)}) to avoid
+ * sending/receiving data from them. Attempts are made to balance communications ("busier"
+ * nodes will {@code TransferDirection.SEND} more and {@code TransferDirection.RECEIVE} fewer
+ * flowfiles from this instance).
+ */
 public class PeerSelector {
-
     private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
-    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private static final long PEER_REFRESH_PERIOD = 60000L;
+    // The timeout for the peer status cache
+    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
-    private volatile long peerRefreshTime = 0L;
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    private volatile PeerStatusCache peerStatusCache;
+    // The service which saves the peer state to persistent storage
     private final PeerPersistence peerPersistence;
 
-    private EventReporter eventReporter;
-
+    // The service which retrieves peer state
     private final PeerStatusProvider peerStatusProvider;
-    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
 
-    static class SystemTime {
-        long currentTimeMillis() {
-            return System.currentTimeMillis();
-        }
-    }
-    private SystemTime systemTime = new SystemTime();
+    // Maps the peer description to a millisecond penalty expiration
+    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<>();
+
+    // The most recently fetched peer statuses
+    private volatile PeerStatusCache peerStatusCache;
+
+    private EventReporter eventReporter;
 
     /**
-     * Replace the SystemTime instance.
-     * This method is purely used by unit testing, to emulate peer refresh period.
+     * Returns a peer selector with the provided collaborators.
+     *
+     * @param peerStatusProvider the service which retrieves peer state
+     * @param peerPersistence    the service which persists peer state
      */
-    void setSystemTime(final SystemTime systemTime) {
-        logger.info("Replacing systemTime instance to {}.", systemTime);
-        this.systemTime = systemTime;
-    }
-
     public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
         this.peerStatusProvider = peerStatusProvider;
         this.peerPersistence = peerPersistence;
 
+        // On instantiation, retrieve the peer status cache
+        restoreInitialPeerStatusCache();
+    }
+
+    /**
+     * Populates the peer status cache from the peer persistence provider (e.g. the file system or
+     * persisted cluster state). If this fails, it will log a warning and continue, as it is not
+     * required for startup. If the cached protocol differs from the currently configured protocol,
+     * the cache will be cleared.
+     */
+    private void restoreInitialPeerStatusCache() {
         try {
             PeerStatusCache restoredPeerStatusCache = null;
             if (peerPersistence != null) {
                 restoredPeerStatusCache = peerPersistence.restore();
+
+                // If there is an existing cache, ensure that the protocol matches the current protocol
                 if (restoredPeerStatusCache != null) {
                     final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
                     final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+
+                    // If the protocols have changed, clear the cache
                     if (!currentProtocol.equals(cachedProtocol)) {
-                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
-                            peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+                        logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+                                peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
                         restoredPeerStatusCache = null;
                     }
                 }
             }
             this.peerStatusCache = restoredPeerStatusCache;
-
         } catch (final IOException ioe) {
             logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
-                peerPersistence.getClass().getSimpleName(), ioe);
+                    peerPersistence.getClass().getSimpleName(), ioe);
         }
     }
 
-    private void persistPeerStatuses() {
-        try {
-            peerPersistence.save(peerStatusCache);
-        } catch (final IOException e) {
-            error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
-                " and the nodes specified at the RPG are down," +
-                " may be unable to transfer data until communications with those nodes are restored", e.toString());
-            logger.error("", e);
+    /**
+     * Returns the normalized weight for this ratio of peer flowfiles to total flowfiles and the given direction. The number will be
+     * a Double between 0 and 100 indicating the percent of all flowfiles the peer
+     * should send/receive. The transfer direction is <em>from the perspective of this node to the peer</em>
+     * (i.e. how many flowfiles should <em>this node send</em> to the peer, or how many flowfiles
+     * should <em>this node receive</em> from the peer).
+     *
+     * @param direction          the transfer direction ({@code SEND} weights the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights them higher if they have more)
+     * @param totalFlowFileCount the total flowfile count in the remote instance (standalone or cluster)
+     * @param flowFileCount      the flowfile count for the given peer
+     * @param peerCount          the number of peers in the remote instance
+     * @return the normalized weight of this peer
+     */
+    private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
+        // If there is only a single remote, send/receive all data to/from it
+        if (peerCount == 1) {
+            return 100;
         }
-    }
-
-    List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
-
-        final int numDestinations = Math.max(128, statuses.size());
-        final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
 
-        long totalFlowFileCount = 0L;
-        for (final PeerStatus nodeInfo : statuses) {
-            totalFlowFileCount += nodeInfo.getFlowFileCount();
+        double cappedPercent;
+        // If no flowfiles exist in the remote instance, evenly weight each node with 1/N
+        if (totalFlowFileCount == 0) {
+            cappedPercent = 1.0 / peerCount;
+        } else {
+            final double percentageOfFlowFiles = ((double) flowFileCount / totalFlowFileCount);
+            cappedPercent = percentageOfFlowFiles;
+
+            // If sending to the remote, allocate more flowfiles to the less-stressed peers
+            if (direction == TransferDirection.SEND) {
+                cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
+            }
         }
+        return new BigDecimal(cappedPercent * 100).setScale(2, RoundingMode.FLOOR).doubleValue();
+    }
 
-        int totalEntries = 0;
-        for (final PeerStatus nodeInfo : statuses) {
-            final int flowFileCount = nodeInfo.getFlowFileCount();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
-
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
+    /**
+     * Returns an ordered map of peers sorted in descending order by value (relative weight).
+     *
+     * @param unsortedMap the unordered map of peers to weights
+     * @return the sorted (desc) map (by value)
+     */
+    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
+        List<Map.Entry<PeerStatus, Double>> list = new ArrayList<>(unsortedMap.entrySet());
+        list.sort(Map.Entry.comparingByValue());
+
+        LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            Map.Entry<PeerStatus, Double> entry = list.get(i);
+            result.put(entry.getKey(), entry.getValue());
         }
 
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final PeerStatus nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-
-            int skipIndex = numEntries;
-            for (int i = 0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if (status == null) {
-                        status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
+        return result;
+    }
+
+    /**
+     * Prints the distribution of the peers to the logger.
+     *
+     * @param sortedPeerWorkloads the peers and relative weights
+     */
+    private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
+        if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
+            DecimalFormat df = new DecimalFormat("##.##");
+            df.setRoundingMode(RoundingMode.FLOOR);
+            final StringBuilder distributionDescription = new StringBuilder();
+            distributionDescription.append("New weighted distribution of nodes:");
+            for (final Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
+                final double percentage = entry.getValue();
+                distributionDescription.append("\n").append(entry.getKey())
+                        .append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ")
+                        .append(df.format(percentage)).append("% of data");
             }
+            logger.debug(distributionDescription.toString());
         }
+    }
 
-        // Shuffle destinations to provide better distribution.
-        // Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
-        // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
-        Collections.shuffle(destinations, new Random(0));
+    /**
+     * Returns the total of all values in the map. This method is frequently used to calculate the total number of
+     * flowfiles in the instance from the respective peer flowfile counts or the total percentage from the relative weights.
+     *
+     * @param peerWeightMap the map of peers to flowfile counts or relative weights
+     * @return the total of the map values
+     */
+    private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
+        return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+    }
 
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / destinations.size();
-            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
-        }
-        logger.info(distributionDescription.toString());
+    /**
+     * Resets all penalization states for the peers.
+     */
+    public void clear() {
+        peerPenaltyExpirations.clear();
+    }
+
+    /**
+     * Return status of a peer that will be used for the next communication.
+     * The peers with lower workloads will be selected with higher probability.
+     *
+     * @param direction the amount of workload is calculated based on transaction direction,
+     *                  for SEND, a peer with fewer flow files is preferred,
+     *                  for RECEIVE, a peer with more flow files is preferred
+     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     */
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+        Set<PeerStatus> peerStatuses = getPeerStatuses();
+        Map<PeerStatus, Double> orderedPeerStatuses = buildWeightedPeerMap(peerStatuses, direction);
+
+        return getAvailablePeerStatus(orderedPeerStatuses);
+    }
 
-        // Jumble the list of destinations.
-        return destinations;
+    /**
+     * Returns {@code true} if this peer is currently penalized and should not send/receive flowfiles.
+     *
+     * @param peerStatus the peer status identifying the peer
+     * @return true if this peer is penalized
+     */
+    public boolean isPenalized(final PeerStatus peerStatus) {
+        final Long expirationEnd = peerPenaltyExpirations.get(peerStatus.getPeerDescription());
+        return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
     }
 
     /**
      * Updates internal state map to penalize a PeerStatus that points to the
-     * specified peer
+     * specified peer.
      *
-     * @param peer the peer
-     * @param penalizationMillis period of time to penalize a given peer
+     * @param peer               the peer
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
         penalize(peer.getDescription(), penalizationMillis);
     }
 
+    /**
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer.
+     *
+     * @param peerDescription    the peer description (identifies the peer)
+     * @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
+     */
     public void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(peerDescription);
+        Long expiration = peerPenaltyExpirations.get(peerDescription);
         if (expiration == null) {
-            expiration = Long.valueOf(0L);
+            expiration = 0L;
         }
 
-        final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
+        final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
+        peerPenaltyExpirations.put(peerDescription, newExpiration);
     }
 
-    public boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
+    /**
+     * Allows for external callers to trigger a refresh of the internal peer status cache. Performs the refresh if the cache has expired. If the cache is still valid, skips the refresh.
+     */
+    public void refresh() {
+        long cacheAgeMs = getCacheAge();
+        logger.debug("External refresh triggered. Last refresh was {} ms ago", cacheAgeMs);
+        if (isPeerRefreshNeeded()) {
+            logger.debug("Refreshing peer status cache");
+            refreshPeerStatusCache();
+        } else {
+            logger.debug("Cache is still valid; skipping refresh");
+        }
     }
 
-    public void clear() {
-        peerTimeoutExpirations.clear();
+    /**
+     * Sets the event reporter instance.
+     *
+     * @param eventReporter the event reporter
+     */
+    public void setEventReporter(EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
     }
 
-    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    /**
+     * Returns a map of peers prepared for flowfile transfer in the specified direction. Each peer is a key and the value is a
+     * weighted percentage of the total flowfiles in the remote instance. For example, in a cluster where the total number of flowfiles
+     * is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C, the resulting map for
+     * {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 * 100 / (3-1)) => 40.0).
+     *
+     * @param statuses  the set of all peers
+     * @param direction the direction of transfer ({@code SEND} weights the destinations higher if they have more flowfiles, {@code RECEIVE} weights them higher if they have fewer)
+     * @return the ordered map of each peer to its relative weight
+     */
+    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final Set<PeerStatus> statuses, final TransferDirection direction) {
+        // Get all the destinations with their relative weights
+        final Map<PeerStatus, Double> peerWorkloads = createDestinationMap(statuses, direction);
+
+        if (!peerWorkloads.isEmpty()) {
+            // This map is sorted, but not by key, so it cannot use SortedMap
+            LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = sortMapByWeight(peerWorkloads);
+
+            // Print the expected distribution of the peers
+            printDistributionStatistics(sortedPeerWorkloads, direction);
+
+            return sortedPeerWorkloads;
+        } else {
+            logger.debug("No peers available");
+            return new LinkedHashMap<>();
+        }
     }
 
     /**
-     * Return status of a peer that will be used for the next communication.
-     * The peer with less workload will be selected with higher probability.
-     * @param direction the amount of workload is calculated based on transaction direction,
-     *                  for SEND, a peer with less flow files is preferred,
-     *                  for RECEIVE, a peer with more flow files is preferred
-     * @return a selected peer, if there is no available peer or all peers are penalized, then return null
+     * Returns a map indexed by a peer to the normalized weight (number of flowfiles currently being
+     * processed by the peer as a percentage of the total). This is used to allocate flowfiles to
+     * the various peers as destinations.
+     *
+     * @param peerStatuses the set of peers, along with their current workload (number of flowfiles)
+     * @param direction    whether sending flowfiles to these peers or receiving them
+     * @return the map of weighted peers
      */
-    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if (isPeerRefreshNeeded(peerList)) {
-            peerRefreshLock.lock();
+    @NotNull
+    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
+        final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
+
+        // Calculate the total number of flowfiles in the peers
+        long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
+        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", totalFlowFileCount);
+
+        // For each node, calculate the relative weight and store it in the map
+        for (final PeerStatus nodeInfo : peerStatuses) {
+            final int flowFileCount = nodeInfo.getFlowFileCount();
+            final double normalizedWeight = calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
+            peerWorkloads.put(nodeInfo, normalizedWeight);
+        }
+
+        return peerWorkloads;
+    }
+
+    /**
+     * Returns a set of {@link PeerStatus} objects representing all remote peers for the provided
+     * {@link PeerDescription}s. If a queried peer returns updated state on a peer which has already
+     * been captured, the new state is used.
+     * <p>
+     * Example:
+     * <p>
+     * 3 node cluster with nodes A, B, C
+     * <p>
+     * Node A knows about Node B and Node C, B about A and C, etc.
+     *
+     * <pre>
+     *     Action                           |   Statuses
+     *     query(A) -> B.status, C.status   |   Bs1, Cs1
+     *     query(B) -> A.status, C.status   |   As1, Bs1, Cs2
+     *     query(C) -> A.status, B.status   |   As2, Bs2, Cs2
+     * </pre>
+     *
+     * @param peersToRequestClusterInfoFrom the set of peers to query
+     * @return the complete set of statuses for each collection of peers
+     * @throws IOException if there is a problem fetching peer statuses
+     */
+    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
+        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
+        Exception lastFailure = null;
+
+        final Set<PeerStatus> allPeerStatuses = new HashSet<>();
+
+        // Iterate through all peers, getting (sometimes multiple) status(es) from each
+        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
             try {
-                // now that we have the lock, check again that we need to refresh (because another thread
-                // could have been refreshing while we were waiting for the lock).
-                peerList = peerStatuses;
-                if (isPeerRefreshNeeded(peerList)) {
-                    try {
-                        peerList = createPeerStatusList(direction);
-                    } catch (final Exception e) {
-                        final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
-                        warn(logger, eventReporter, message);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-                    }
+                // Retrieve the peer status(es) from each peer description
+                final Set<PeerStatus> statusesForPeerDescription = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
 
-                    this.peerStatuses = peerList;
-                    peerRefreshTime = systemTime.currentTimeMillis();
-                }
-            } finally {
-                peerRefreshLock.unlock();
+                // Filter to remove any peers which are not queryable
+                final Set<PeerStatus> filteredStatuses = statusesForPeerDescription.stream()
+                        .filter(PeerStatus::isQueryForPeers)
+                        .collect(Collectors.toSet());
+
+                allPeerStatuses.addAll(filteredStatuses);
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}",
+                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
+                lastFailure = e;
             }
         }
 
-        if (peerList == null || peerList.isEmpty()) {
+        // If no peers were fetched and an exception was the cause, throw an exception
+        if (allPeerStatuses.isEmpty() && lastFailure != null) {
+            throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
+        }
+
+        return allPeerStatuses;
+    }
+
+    /**
+     * Returns the {@link PeerStatus} identifying the next peer to send/receive data. This uses random
+     * selection of peers, weighted by the relative desirability (i.e. for {@code SEND}, peers with more
+     * flowfiles are more likely to be selected, and for {@code RECEIVE}, peers with fewer flowfiles are
+     * more likely).
+     *
+     * @param orderedPeerStatuses the map of peers to relative weights, sorted in descending order by weight
+     * @return the peer to send/receive data
+     */
+    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
+        if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
+            logger.warn("Available peers collection is empty; no peer available");
             return null;
         }
 
-        PeerStatus peerStatus;
-        for (int i = 0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
+        final double totalWeights = sumMapValues(orderedPeerStatuses);
+        logger.debug("Determining next available peer ({} peers with total weight {})", orderedPeerStatuses.keySet().size(), totalWeights);
+
+        final double random = Math.random() * 100;
+        logger.debug("Generated random value {}", random);
+        if (random > totalWeights) {
+            logger.warn("Random selection was outside of the precision of the weights ({}, {}); allocating to the first available peer", random, totalWeights);
+            return new ArrayList<>(orderedPeerStatuses.keySet()).get(0);
+        }
 
-            if (isPenalized(peerStatus)) {
-                logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
-            } else {
-                return peerStatus;
+        double threshold = 0.0;
+        for (Map.Entry<PeerStatus, Double> e : orderedPeerStatuses.entrySet()) {
+            logger.debug("Initial threshold was {}; added peer value {}; total {}", threshold, e.getValue(), threshold + e.getValue());
+            threshold += e.getValue();
+            if (random <= threshold) {
+                PeerStatus peerStatus = e.getKey();
+                if (isPenalized(peerStatus)) {
+                    logger.debug("{} is penalized; will not communicate with this peer", peerStatus);
+                } else {
+                    return peerStatus;
+                }
             }
         }
 
-        logger.debug("{} All peers appear to be penalized; returning null", this);
+        logger.debug("Did not select a peer; r {}, t {}, w {}", random, threshold, orderedPeerStatuses.values());
+        logger.debug("All peers appear to be penalized; returning null");
         return null;
     }
 
-    private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
-        Set<PeerStatus> statuses = getPeerStatuses();
-        if (statuses == null) {
-            refreshPeers();
-            statuses = getPeerStatuses();
-            if (statuses == null) {
-                logger.debug("{} found no peers to connect to", this);
-                return Collections.emptyList();
-            }
+    /**
+     * Returns the cache age in milliseconds. If the cache is null or not set, returns {@code -1}.
+     *
+     * @return the cache age in millis
+     */
+    private long getCacheAge() {
+        if (peerStatusCache == null) {
+            return -1;
         }
-        return formulateDestinationList(statuses, direction);
+        return System.currentTimeMillis() - peerStatusCache.getTimestamp();
     }
 
-    private Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < systemTime.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
-                equalizedSet.add(equalizedStatus);
-            }
+    /**
+     * Returns the set of queryable peers ({@link PeerStatus#isQueryForPeers()}) most recently fetched.
+     *
+     * @return the set of queryable peers (empty set if the cache is {@code null})
+     */
+    @NotNull
+    private Set<PeerStatus> getLastFetchedQueryablePeers() {
+        return peerStatusCache != null ? peerStatusCache.getStatuses() : Collections.emptySet();
+    }
 
-            return equalizedSet;
+    /**
+     * Returns the set of peer statuses. If the cache is {@code null} or empty, refreshes the cache first and then returns the new peer status set.
+     *
+     * @return the most recent peer statuses (empty set if the cache is {@code null})
+     */
+    @NotNull
+    private Set<PeerStatus> getPeerStatuses() {
+        if (isPeerRefreshNeeded()) {
+            refreshPeerStatusCache();
         }
 
-        return cache.getStatuses();
+        return getLastFetchedQueryablePeers();
     }
 
-    public void refreshPeers() {
-        final PeerStatusCache existingCache = peerStatusCache;
-        if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
-            return;
-        }
+    /**
+     * Returns the set of {@link PeerDescription} objects uniquely identifying each NiFi node which should be queried for {@link PeerStatus}.
+     *
+     * @return the set of recently retrieved peers and the bootstrap peer
+     * @throws IOException if there is a problem retrieving the list of peers to query
+     */
+    private Set<PeerDescription> getPeersToQuery() throws IOException {
+        final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
 
-        try {
-            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
-            peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
-            persistPeerStatuses();
-            logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
-        } catch (Exception e) {
-            warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
-            if (logger.isDebugEnabled()) {
-                logger.debug("", e);
+        // Use the peers fetched last time
+        final Set<PeerStatus> lastFetched = getLastFetchedQueryablePeers();
+        if (lastFetched != null && !lastFetched.isEmpty()) {
+            for (PeerStatus peerStatus : lastFetched) {
+                peersToRequestClusterInfoFrom.add(peerStatus.getPeerDescription());
             }
         }
+
+        // Always add the configured node info to the list of peers
+        peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+
+        return peersToRequestClusterInfoFrom;
     }
 
-    public void setEventReporter(EventReporter eventReporter) {
-        this.eventReporter = eventReporter;
+    /**
+     * Returns {@code true} if this cache has expired.
+     *
+     * @param cache the peer status cache
+     * @return true if the cache is expired
+     */
+    private boolean isCacheExpired(PeerStatusCache cache) {
+        return cache == null || cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis();
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
-        final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
+    /**
+     * Returns {@code true} if the internal collection of peers is empty or the refresh time has passed.
+     *
+     * @return true if the peer statuses should be refreshed
+     */
+    private boolean isPeerRefreshNeeded() {
+        return (peerStatusCache == null || peerStatusCache.isEmpty() || isCacheExpired(peerStatusCache));
+    }
 
-        // Look at all of the peers that we fetched last time.
-        final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
-        if (lastFetched != null && !lastFetched.isEmpty()) {
-            lastFetched.stream().map(peer -> peer.getPeerDescription())
-                    .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
-        }
+    /**
+     * Persists the provided cache instance (in memory and via the {@link PeerPersistence} (e.g. in cluster state or a local file)) for future retrieval.
+     *
+     * @param peerStatusCache the cache of current peer statuses to persist
+     */
+    private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
+        try {
+            this.peerStatusCache = peerStatusCache;
 
-        // Always add the configured node info to the list of peers to communicate with
-        peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+            // The #save mechanism persists the cache to stateful or file-based storage
+            peerPersistence.save(peerStatusCache);
+        } catch (final IOException e) {
+            error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" +
+                    " and the nodes specified at the remote instance are down," +
+                    " may be unable to transfer data until communications with those nodes are restored", e.toString());
+            logger.error("", e);
+        }
+    }
 
-        logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
-        Exception lastFailure = null;
-        for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
-            try {
-                final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
-                lastFetchedQueryablePeers = statuses.stream()
-                        .filter(p -> p.isQueryForPeers())
-                        .collect(Collectors.toSet());
+    /**
+     * Refreshes the list of S2S peers that flowfiles can be sent to or received from. Uses the stateful
+     * cache to reduce network overhead.
+     */
+    private void refreshPeerStatusCache() {
+        try {
+            // Splitting enumeration and querying into separate methods allows better testing and composition
+            final Set<PeerDescription> peersToQuery = getPeersToQuery();
+            final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peersToQuery);
 
-                return statuses;
-            } catch (final Exception e) {
-                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}",
-                        peerDescription.getHostname(), peerDescription.getPort(), e.toString());
-                lastFailure = e;
+            if (statuses.isEmpty()) {
+                logger.info("No peers were retrieved from the remote group {}", peersToQuery.stream().map(p -> p.getHostname() + ":" + p.getPort()).collect(Collectors.joining(",")));
             }
-        }
 
-        final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
-        if (lastFailure != null) {
-            ioe.addSuppressed(lastFailure);
+            // Persist the fetched peer statuses
+            PeerStatusCache peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+            persistPeerStatuses(peerStatusCache);
+            logger.info("Successfully refreshed peer status cache; remote group consists of {} peers", statuses.size());
+        } catch (Exception e) {
+            warn(logger, eventReporter, "Unable to refresh remote group peers due to: {}", e.getMessage());
+            if (logger.isDebugEnabled() && e.getCause() != null) {
+                Throwable cause = e.getCause();
+                while ((cause = cause.getCause()) != null) {

Review comment:
       This guarantees an infinite loop in almost all cases. When a `Throwable` is constructed without a given cause, it does not declare `cause` to be `null` but rather `cause` will be set to `this` I.e., the `Throwable` class declared the member variable as:
   ```
   private Throwable cause = this;
   ```
   This will enter into an infinite loop, logging:
   ```
       Caused by: <some message>
   ```
   We should be able to just log the Exception instead. That should provide the stack trace appropriately. It also allows user to configure whether or not to log the stack trace and how verbose and ensures that the full stack trace is logged, etc.

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
##########
@@ -435,6 +425,12 @@ public PeerDescription getBootstrapPeerDescription() throws IOException {
         return peerStatuses;
     }
 
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription, SiteToSiteCommunicator communicator) throws IOException {
+        // TODO: Implement method and refactor shared logic

Review comment:
       Is this intended to be implemented?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
##########
@@ -919,7 +918,7 @@ public void refreshFlowContents() throws CommunicationsException {
                 final List<String> inputPortString = dto.getInputPorts().stream()
                     .map(port -> "InputPort[name=" + port.getName() + ", targetId=" + port.getId() + "]")
                     .collect(Collectors.toList());
-                final List<String> outputPortString = dto.getInputPorts().stream()
+                final List<String> outputPortString = dto.getOutputPorts().stream()

Review comment:
       Wow. Good catch. :)

##########
File path: nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
##########
@@ -189,6 +189,14 @@
     private static final ConcurrentMap<String, RemoteGroupContents> contentsMap = new ConcurrentHashMap<>();
     private volatile long lastPruneTimestamp = System.currentTimeMillis();
 
+    // // Package-private constructor to allow unit testing/mocking

Review comment:
       Should remove code if not needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org