You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by il...@apache.org on 2020/11/30 18:15:08 UTC

[lucene-solr] branch jira/solr-15004 updated: Fix AZ selection for placement to pick the one with the most suitable node when multiple AZ's have same number of replicas

This is an automated email from the ASF dual-hosted git repository.

ilan pushed a commit to branch jira/solr-15004
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-15004 by this push:
     new 68e2cd5  Fix AZ selection for placement to pick the one with the most suitable node when multiple AZ's have same number of replicas
68e2cd5 is described below

commit 68e2cd5d29581d820e206cb0fcbb11bb4e81aaad
Author: Ilan Ginzburg <ig...@salesforce.com>
AuthorDate: Mon Nov 30 19:14:08 2020 +0100

    Fix AZ selection for placement to pick the one with the most suitable node when multiple AZ's have same number of replicas
---
 .../plugins/AffinityPlacementFactory.java          | 103 ++++++++-----
 .../plugins/AffinityPlacementFactoryTest.java      | 159 ++++++++++++++++++---
 2 files changed, 204 insertions(+), 58 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
index affd627..ce83d60 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -437,66 +437,97 @@ public class AffinityPlacementFactory implements PlacementPluginFactory {
 
       CoresAndDiskComparator coresAndDiskComparator = new CoresAndDiskComparator(attrValues, coresOnNodes, prioritizedFreeDiskGB);
 
-      // Now we have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas
-      // (candidate: does not already have a replica of this shard and is in the corresponding AZ).
-      // We must now select those of the nodes on which we actually place the replicas, and will do that based on the total
-      // number of cores already present on these nodes as well as the free disk space.
-      // We sort once by the order related to number of cores and disk space each list of nodes on an AZ. We do not sort all
-      // of them ahead of time because we might be placing a small number of replicas and it might be wasted work.
       for (int i = 0; i < numReplicas; i++) {
-        // Pick the AZ having the lowest number of replicas for this shard, and if that AZ has available nodes, pick the
-        // most appropriate one (based on number of cores and disk space constraints). In the process, remove entries (AZ's)
-        // that do not have nodes to place replicas on because these are useless to us.
-        Map.Entry<Integer, AzWithNodes> azWithNodesEntry = null;
+        // We have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas
+        // (candidate: does not already have a replica of this shard and is in the corresponding AZ).
+        // Among the AZ's with the minimal number of replicas of the given replica type for the shard, we must pick the AZ that
+        // offers the best placement (based on number of cores and free disk space). In order to do so, for these "minimal" AZ's
+        // we sort the nodes from best to worst placement candidate (based on the number of cores and free disk space) then pick
+        // the AZ that has the best best node. We don't sort all AZ's because that will not necessarily be needed.
+        int minNumberOfReplicasPerAz = 0; // This value never observed but compiler can't tell
+        Set<Map.Entry<Integer, AzWithNodes>> candidateAzEntries = null;
+        // Iterate over AZ's (in the order of increasing number of replicas on that AZ) and do two things: 1. remove those AZ's that
+        // have no nodes, no use iterating over these again and again (as we compute placement for more replicas), and 2. collect
+        // all those AZ with a minimal number of replicas.
         for (Iterator<Map.Entry<Integer, AzWithNodes>> it = azByExistingReplicas.entries().iterator(); it.hasNext(); ) {
           Map.Entry<Integer, AzWithNodes> entry = it.next();
-          if (!entry.getValue().availableNodesForPlacement.isEmpty()) {
-            azWithNodesEntry = entry;
-            // Remove this entry. Will add it back after a node has been removed from the list of available nodes and the number
-            // of replicas on the AZ has been increased by one (search for "azByExistingReplicas.put" below).
+          int numberOfNodes = entry.getValue().availableNodesForPlacement.size();
+          if (numberOfNodes == 0) {
             it.remove();
-            break;
-          } else {
+          } else { // AZ does have node(s) for placement
+            if (candidateAzEntries == null) {
+              // First AZ with nodes that can take the replica. Initialize tracking structures
+              minNumberOfReplicasPerAz = numberOfNodes;
+              candidateAzEntries = new HashSet<>();
+            }
+            if (minNumberOfReplicasPerAz != numberOfNodes) {
+              // AZ's with more replicas than the minimum number seen are not placement candidates
+              break;
+            }
+            candidateAzEntries.add(entry);
+            // We remove all entries that are candidates: the "winner" will be modified, all entries might also be sorted,
+            // so we'll insert back the updated versions later.
             it.remove();
           }
         }
 
-        if (azWithNodesEntry == null) {
+        if (candidateAzEntries == null) {
           // This can happen because not enough nodes for the placement request or already too many nodes with replicas of
           // the shard that can't accept new replicas or not enough nodes with enough free disk space.
           throw new PlacementException("Not enough nodes to place " + numReplicas + " replica(s) of type " + replicaType +
               " for shard " + shardName + " of collection " + solrCollection.getName());
         }
 
-        AzWithNodes azWithNodes = azWithNodesEntry.getValue();
-        List<Node> nodes = azWithNodes.availableNodesForPlacement;
+        // Iterate over all candidate AZ's, sort them if needed and find the best one to use for this placement
+        Map.Entry<Integer, AzWithNodes> selectedAz = null;
+        Node selectedAzBestNode = null;
+        for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : candidateAzEntries) {
+          AzWithNodes azWithNodes = candidateAzEntry.getValue();
+          List<Node> nodes = azWithNodes.availableNodesForPlacement;
+
+          if (!azWithNodes.hasBeenSorted) {
+            // Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this
+            // likely is not the case since after having added a replica to a node its number of cores increases for the next
+            // placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see
+            // the same initial cluster state, and we want placement to be reasonable even in that case without creating an
+            // unnecessary imbalance).
+            // For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node
+            // for placement, not always the same one due to some internal ordering.
+            Collections.shuffle(nodes, random);
+
+            // Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list
+            nodes.sort(coresAndDiskComparator);
+
+            azWithNodes.hasBeenSorted = true;
+          }
 
-        if (!azWithNodes.hasBeenSorted) {
-          // Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this
-          // likely is not the case since after having added a replica to a node its number of cores increases for the next
-          // placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see
-          // the same initial cluster state, and we want placement to be reasonable even in that case without creating an
-          // unnecessary imbalance).
-          // For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node
-          // for placement, not always the same one due to some internal ordering.
-          Collections.shuffle(nodes, random);
+          // Which one is better, the new one or the previous best?
+          if (selectedAz == null || coresAndDiskComparator.compare(nodes.get(0), selectedAzBestNode) < 0) {
+            selectedAz = candidateAzEntry;
+            selectedAzBestNode = nodes.get(0);
+          }
+        }
 
-          // Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list
-          nodes.sort(coresAndDiskComparator);
+        // Now actually remove the selected node from the winning AZ
+        AzWithNodes azWithNodes = selectedAz.getValue();
+        List<Node> nodes = selectedAz.getValue().availableNodesForPlacement;
+        Node assignTarget = nodes.remove(0);
 
-          azWithNodes.hasBeenSorted = true;
+        // Insert back all the qualifying but non winning AZ's removed while searching for the one
+        for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
+          if (removedAzs != selectedAz) {
+            azByExistingReplicas.put(removedAzs.getKey(), removedAzs.getValue());
+          }
         }
 
-        Node assignTarget = nodes.remove(0);
+        // Insert back a corrected entry for the winning AZ: one more replica living there and one less node that can accept new replicas
+        // (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration).
+        azByExistingReplicas.put(selectedAz.getKey() + 1, azWithNodes);
 
         // Do not assign that node again for replicas of other replica type for this shard
         // (this update of the set is not useful in the current execution of this method but for following ones only)
         nodesWithReplicas.add(assignTarget);
 
-        // Insert back a corrected entry for the AZ: one more replica living there and one less node that can accept new replicas
-        // (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration).
-        azByExistingReplicas.put(azWithNodesEntry.getKey() + 1, azWithNodes);
-
         // Track that the node has one more core. These values are only used during the current run of the plugin.
         coresOnNodes.merge(assignTarget, 1, Integer::sum);
 
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
index 3b9b767..9850709 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -224,6 +224,123 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
 
 
   /**
+   * Tests placement with multiple criteria: Replica type restricted nodes, Availability zones + existing collection
+   */
+  @Test
+  public void testPlacementMultiCriteria() throws Exception {
+    String collectionName = "multiCollection";
+
+    // Note node numbering is in purpose not following AZ structure
+    final int AZ1_NRT_LOWCORES = 0;
+    final int AZ1_NRT_HIGHCORES = 3;
+    final int AZ1_TLOGPULL_LOWFREEDISK = 5;
+
+    final int AZ2_NRT_MEDCORES = 2;
+    final int AZ2_NRT_HIGHCORES = 1;
+    final int AZ2_TLOGPULL = 7;
+
+    final int AZ3_NRT_LOWCORES = 4;
+    final int AZ3_NRT_HIGHCORES = 6;
+    final int AZ3_TLOGPULL = 8;
+
+    final String AZ1 = "AZ1";
+    final String AZ2 = "AZ2";
+    final String AZ3 = "AZ3";
+
+    final int LOW_CORES = 10;
+    final int MED_CORES = 50;
+    final int HIGH_CORES = 100;
+
+    final String TLOG_PULL_REPLICA_TYPE = "TLOG, PULL";
+    final String NRT_REPLICA_TYPE = "Nrt";
+
+    // Cluster nodes and their attributes.
+    // 3 AZ's with three nodes each, 2 of which can only take NRT, one that can take TLOG or PULL
+    // One of the NRT has less cores than the other
+    // The TLOG/PULL replica on AZ1 doesn't have much free disk space
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
+    LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    for (int i = 0; i < 9; i++) {
+      final String az;
+      final int numcores;
+      final long freedisk;
+      final String acceptedReplicaType;
+
+      if (i == AZ1_NRT_LOWCORES || i == AZ1_NRT_HIGHCORES || i == AZ1_TLOGPULL_LOWFREEDISK) {
+        az = AZ1;
+      } else if (i == AZ2_NRT_HIGHCORES || i == AZ2_NRT_MEDCORES || i == AZ2_TLOGPULL) {
+        az = AZ2;
+      } else {
+        az = AZ3;
+      }
+
+      if (i == AZ1_NRT_LOWCORES || i == AZ3_NRT_LOWCORES) {
+        numcores = LOW_CORES;
+      } else if (i == AZ2_NRT_MEDCORES) {
+        numcores = MED_CORES;
+      } else {
+        numcores = HIGH_CORES;
+      }
+
+      if (i == AZ1_TLOGPULL_LOWFREEDISK) {
+        freedisk = PRIORITIZED_FREE_DISK_GB - 10;
+      } else {
+        freedisk = PRIORITIZED_FREE_DISK_GB + 10;
+      }
+
+      if (i == AZ1_TLOGPULL_LOWFREEDISK || i == AZ2_TLOGPULL || i == AZ3_TLOGPULL) {
+        acceptedReplicaType = TLOG_PULL_REPLICA_TYPE;
+      } else {
+        acceptedReplicaType = NRT_REPLICA_TYPE;
+      }
+
+      nodeBuilders.get(i).setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, az)
+          .setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, acceptedReplicaType)
+          .setCoreCount(numcores)
+          .setFreeDiskGB(freedisk);
+    }
+
+    // The collection already exists with shards and replicas.
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+    List<List<String>> shardsReplicas = List.of(
+        List.of("NRT " + AZ1_NRT_HIGHCORES, "TLOG " + AZ3_TLOGPULL), // shard 1
+        List.of("TLOG " + AZ2_TLOGPULL)); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    SolrCollection solrCollection = collectionBuilder.build();
+
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    Iterator<Shard> shardit = solrCollection.iterator();
+    String shard1Name = shardit.next().getShardName();
+    String shard2Name = shardit.next().getShardName();
+
+    // Add 2 NRT and one TLOG to each shard.
+    PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+        2, 1, 0);
+    PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+    // Shard 1: The NRT's should go to the med cores node on AZ2 and low core on az3 (even though
+    // a low core node can take the replica in az1, there's already an NRT replica there and we want spreading across AZ's),
+    // the TLOG to the TLOG node on AZ2 (because the tlog node on AZ1 has low free disk)
+    // Shard 2: The NRT's should go to AZ1 and AZ3 lowcores because AZ2 has more cores (and there's not NRT in any AZ for
+    // this shard). The TLOG should go to AZ3 because AZ1 TLOG node has low free disk.
+    // Each expected placement is represented as a string "shard replica-type node"
+    Set<String> expectedPlacements = Set.of("1 NRT " + AZ2_NRT_MEDCORES, "1 NRT " + AZ3_NRT_LOWCORES, "1 TLOG " + AZ2_TLOGPULL,
+        "2 NRT " + AZ1_NRT_LOWCORES, "2 NRT " + AZ3_NRT_LOWCORES, "2 TLOG " + AZ3_TLOGPULL);
+    verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+
+    // If we add 2 PULL to each shard
+    placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+        0, 0, 2);
+    pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+    // Shard 1: Given node AZ3_TLOGPULL is taken by the TLOG replica, the PULL should go to AZ1_TLOGPULL_LOWFREEDISK and AZ2_TLOGPULL
+    // Shard 2: Similarly AZ2_TLOGPULL is taken. Replicas should go to AZ1_TLOGPULL_LOWFREEDISK and AZ3_TLOGPULL
+    expectedPlacements = Set.of("1 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "1 PULL " + AZ2_TLOGPULL,
+        "2 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "2 PULL " + AZ3_TLOGPULL);
+    verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+  }
+
+
+  /**
    * Tests that if a collection has replicas on nodes not currently live, placement for new replicas works ok.
    */
   @Test
@@ -284,7 +401,6 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
   private static void verifyPlacements(Set<String> expectedPlacements, PlacementPlan placementPlan,
                                        List<Builders.ShardBuilder> shardBuilders, List<Node> liveNodes) {
     Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
-    assertEquals("Wrong number of computed placements", expectedPlacements.size(), computedPlacements.size());
 
     // Prepare structures for looking up shard name index and node index
     Map<String, Integer> shardNumbering = new HashMap<>();
@@ -298,38 +414,37 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
       nodeNumbering.put(n, index++);
     }
 
-    // While developing tests (or trying to understand failures), uncomment these lines to help explain failures
-    // TODO translate this into the assertion message in case of failure
-//    logExpectedPlacement(expectedPlacements);
-//    logComputedPlacement(computedPlacements, shardNumbering, nodeNumbering);
+    if (expectedPlacements.size() != computedPlacements.size()) {
+      fail("Wrong number of placements, expected " + expectedPlacements.size() + " computed " + computedPlacements.size() + ". " +
+          getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+    }
 
     Set<String> expected = new HashSet<>(expectedPlacements);
     for (ReplicaPlacement p : computedPlacements) {
       String lookUpPlacementResult = shardNumbering.get(p.getShardName()) + " " + p.getReplicaType().name() + " " +  nodeNumbering.get(p.getNode());
-      assertTrue(expected.remove(lookUpPlacementResult));
+      if (!expected.remove(lookUpPlacementResult)) {
+        fail("Computed placement [" + lookUpPlacementResult + "] not expected. " +
+            getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+      }
     }
   }
 
-  private static void logExpectedPlacement(Set<String> expectedPlacements) {
-    if (log.isInfoEnabled()) {
-      StringBuilder sb = new StringBuilder();
-      for (String placement : expectedPlacements) {
-        sb.append("[").append(placement).append("] ");
-      }
-      log.info("Expected placements: " + sb); // nowarn
+  private static String getExpectedVsComputedPlacement(Set<String> expectedPlacements, Set<ReplicaPlacement> computedPlacements,
+                                                       Map<String, Integer> shardNumbering, Map<Node, Integer> nodeNumbering) {
+
+    StringBuilder sb = new StringBuilder("Expected placement: ");
+    for (String placement : expectedPlacements) {
+      sb.append("[").append(placement).append("] ");
     }
-  }
 
-  private static void logComputedPlacement(Set<ReplicaPlacement> computedPlacements, Map<String, Integer> shardNumbering, Map<Node, Integer> nodeNumbering) {
-    if (log.isInfoEnabled()) {
-      StringBuilder sb = new StringBuilder();
-      for (ReplicaPlacement placement : computedPlacements) {
-        String lookUpPlacementResult = shardNumbering.get(placement.getShardName()) + " " + placement.getReplicaType().name() + " " +  nodeNumbering.get(placement.getNode());
+    sb.append("Computed placement: ");
+    for (ReplicaPlacement placement : computedPlacements) {
+      String lookUpPlacementResult = shardNumbering.get(placement.getShardName()) + " " + placement.getReplicaType().name() + " " +  nodeNumbering.get(placement.getNode());
 
-        sb.append("[").append(lookUpPlacementResult).append("] ");
-      }
-      log.info("Computed placements: " + sb); // nowarn
+      sb.append("[").append(lookUpPlacementResult).append("] ");
     }
+
+    return sb.toString();
   }
 
   @Test