You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/06/15 16:56:06 UTC

[solr] branch branch_9x updated: SOLR-16806: Create a BalanceReplicas API (#1650)

This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 8c070ef8085 SOLR-16806: Create a BalanceReplicas API (#1650)
8c070ef8085 is described below

commit 8c070ef80859c1c3690aa4570caa770bdf282392
Author: Houston Putman <ho...@apache.org>
AuthorDate: Thu Jun 15 12:17:51 2023 -0400

    SOLR-16806: Create a BalanceReplicas API (#1650)
    
    - Introduce BalanceReplicasAPI
    - Add computeReplicaBalancing() in Assign, and computeBalancing() in PlacementPlugin.
      The default implementation returns an empty balancePlan (moving no replicas).
    - Refactor all provided PlacementPlugins, to implement OrderedNodePlacementPlugin,
      and share a common way of computing placements and balancing.
    
    Also fixes SOLR-16816, updating metrics when doing multi-shard/collection placements.
    
    (cherry picked from commit de5d1aaec7d5269ebd60ba8e73f06dc31a98887d)
---
 solr/CHANGES.txt                                   |   11 +-
 .../apache/solr/cloud/api/collections/Assign.java  |   15 +
 .../cloud/api/collections/BalanceReplicasCmd.java  |   91 ++
 .../solr/cloud/api/collections/CollApiCmds.java    |    2 +
 .../solr/cloud/api/collections/DeleteNodeCmd.java  |  104 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |  248 +---
 ...laceNodeCmd.java => ReplicaMigrationUtils.java} |  254 ++--
 .../apache/solr/cluster/placement/BalancePlan.java |   46 +
 .../solr/cluster/placement/BalancePlanFactory.java |   38 +
 .../solr/cluster/placement/BalanceRequest.java     |   48 +
 .../solr/cluster/placement/PlacementContext.java   |    3 +
 .../solr/cluster/placement/PlacementPlugin.java    |   16 +-
 .../placement/impl/BalancePlanFactoryImpl.java     |   34 +
 .../cluster/placement/impl/BalancePlanImpl.java    |   58 +
 .../cluster/placement/impl/BalanceRequestImpl.java |   86 ++
 .../impl/PlacementPluginAssignStrategy.java        |   57 +
 .../placement/impl/SimplePlacementContextImpl.java |    7 +
 .../plugins/AffinityPlacementFactory.java          | 1432 +++++++-------------
 .../plugins/MinimizeCoresPlacementFactory.java     |  159 +--
 .../plugins/OrderedNodePlacementPlugin.java        |  702 ++++++++++
 .../placement/plugins/RandomPlacementFactory.java  |  131 +-
 .../placement/plugins/SimplePlacementFactory.java  |  224 +--
 .../solr/handler/admin/CollectionsHandler.java     |    2 +
 .../solr/handler/admin/api/BalanceReplicasAPI.java |  128 ++
 .../org/apache/solr/cloud/BalanceReplicasTest.java |  193 +++
 .../impl/PlacementPluginIntegrationTest.java       |    6 +-
 .../plugins/AbstractPlacementFactoryTest.java      |  225 +++
 .../plugins/AffinityPlacementFactoryTest.java      |  503 +++++--
 .../plugins/MinimizeCoresPlacementFactoryTest.java |  348 +++++
 .../pages/cluster-node-management.adoc             |   77 +-
 .../java/org/apache/solr/common/cloud/Replica.java |    9 +
 .../solr/common/params/CollectionParams.java       |    5 +
 .../apache/solr/cluster/placement/Builders.java    |   27 +-
 33 files changed, 3477 insertions(+), 1812 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ff5e53c2237..2abdb198882 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,11 +17,14 @@ New Features
 
 * SOLR-16674: Introduced support for byte vector encoding in DenseVectorField and KnnQParser (Elia Porciani via Alessandro Benedetti).
 
-* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across domains within the availablity zone and
+* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across domains within the availability zone and
   optionally fail the request if more than a configurable number of replicas need to be placed in a single domain. (Houston Putman, Tomás Fernández Löbbe)
 
 * SOLR-16836: Introduced support for high dimensional vectors (Alessandro Benedetti).
 
+* SOLR-16806: Solr now provides a BalanceReplicas API at `POST /api/cluster/replicas/balance` (v2), to spread replicas
+  across a given set of nodes. No v1 API is available. (Houston Putman, Tomás Fernández Löbbe, Jason Gerlowski, Radu Gheorghe)
+
 Improvements
 ---------------------
 
@@ -120,6 +123,12 @@ Improvements
 * SOLR-16392: The v2 "create shard" API has been tweaked to be more intuitive, by removing the top-level "create"
   command specifier.  The rest of the API remains unchanged. (Jason Gerlowski)
 
+* SOLR-16806: The included PlacementPlugins (Random, Simple, MinimizeCores and Affinity) now all implement the
+  OrderedNodePlacementPlugin, which provides the implementation for computePlacements() and computeBalancing().
+  Each implementing PlacementPlugin provides a way of weighting Solr Nodes, and the OrderedNodePlacement plugin
+  then uses the weights to decide the optimal strategy for placing new replicas or balancing existing replicas.
+  (Houston Putman, Tomás Fernández Löbbe, Jason Gerlowski, Radu Gheorghe)
+
 Optimizations
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index ff8b7c71b32..6304e522209 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -451,6 +451,21 @@ public class Assign {
         SolrCloudManager solrCloudManager, List<AssignRequest> assignRequests)
         throws AssignmentException, IOException, InterruptedException;
 
+    /**
+     * Balance replicas across nodes.
+     *
+     * @param solrCloudManager current instance of {@link SolrCloudManager}.
+     * @param nodes to compute replica balancing across.
+     * @param maxBalanceSkew to ensure strictness of replica balancing.
+     * @return Map from Replica to the Node where that Replica should be moved.
+     * @throws AssignmentException when balance request cannot produce any valid assignments.
+     */
+    default Map<Replica, String> computeReplicaBalancing(
+        SolrCloudManager solrCloudManager, Set<String> nodes, int maxBalanceSkew)
+        throws AssignmentException, IOException, InterruptedException {
+      return Collections.emptyMap();
+    }
+
     /**
      * Verify that deleting a collection doesn't violate the replica assignment constraints.
      *
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java
new file mode 100644
index 00000000000..7d84c7c6633
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.util.NamedList;
+
+public class BalanceReplicasCmd implements CollApiCmds.CollectionApiCommand {
+  private final CollectionCommandContext ccc;
+
+  public BalanceReplicasCmd(CollectionCommandContext ccc) {
+    this.ccc = ccc;
+  }
+
+  @SuppressWarnings({"unchecked"})
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
+      throws Exception {
+    Set<String> nodes;
+    Object nodesRaw = message.get(CollectionParams.NODES);
+    if (nodesRaw == null) {
+      nodes = Collections.emptySet();
+    } else if (nodesRaw instanceof Set) {
+      nodes = (Set<String>) nodesRaw;
+    } else if (nodesRaw instanceof Collection) {
+      nodes = new HashSet<>((Collection<String>) nodesRaw);
+    } else if (nodesRaw instanceof String) {
+      nodes = Set.of(((String) nodesRaw).split(","));
+    } else {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "'nodes' was not passed as a correct type (Set/List/String): "
+              + nodesRaw.getClass().getName());
+    }
+    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+    String async = message.getStr(ASYNC);
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+    boolean parallel = message.getBool("parallel", false);
+
+    if (nodes.size() == 1) {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "Cannot balance across a single node: " + nodes.stream().findAny().get());
+    }
+
+    Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
+    Map<Replica, String> replicaMovements =
+        assignStrategy.computeReplicaBalancing(
+            ccc.getSolrCloudManager(),
+            nodes,
+            message.getInt(CollectionParams.MAX_BALANCE_SKEW, -1));
+
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "BalanceReplicas action completed successfully across nodes  : ["
+              + String.join(", ", nodes)
+              + "]");
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
index 8538c059d93..e35023023b1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
@@ -35,6 +35,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.AD
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCE_REPLICAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
@@ -142,6 +143,7 @@ public class CollApiCmds {
       commandMap =
           Map.ofEntries(
               Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)),
+              Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)),
               Map.entry(DELETENODE, new DeleteNodeCmd(ccc)),
               Map.entry(BACKUP, new BackupCmd(ccc)),
               Map.entry(RESTORE, new RestoreCmd(ccc)),
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
index d4ae8154ec7..0715da370a5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -17,31 +17,18 @@
 
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private final CollectionCommandContext ccc;
 
   public DeleteNodeCmd(CollectionCommandContext ccc) {
@@ -53,7 +40,7 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
       throws Exception {
     CollectionHandlingUtils.checkRequired(message, "node");
     String node = message.getStr("node");
-    List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
+    List<Replica> sourceReplicas = ReplicaMigrationUtils.getReplicasOfNode(node, state);
     List<String> singleReplicas = verifyReplicaAvailability(sourceReplicas, state);
     if (!singleReplicas.isEmpty()) {
       results.add(
@@ -61,32 +48,25 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
           "Can't delete the only existing non-PULL replica(s) on node "
               + node
               + ": "
-              + singleReplicas.toString());
+              + singleReplicas);
     } else {
-      cleanupReplicas(results, state, sourceReplicas, ccc, node, message.getStr(ASYNC));
+      ReplicaMigrationUtils.cleanupReplicas(
+          results, state, sourceReplicas, ccc, message.getStr(ASYNC));
     }
   }
 
   // collect names of replicas that cannot be deleted
-  static List<String> verifyReplicaAvailability(
-      List<ZkNodeProps> sourceReplicas, ClusterState state) {
+  static List<String> verifyReplicaAvailability(List<Replica> sourceReplicas, ClusterState state) {
     List<String> res = new ArrayList<>();
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String coll = sourceReplica.getStr(COLLECTION_PROP);
-      String shard = sourceReplica.getStr(SHARD_ID_PROP);
-      String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+    for (Replica sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getCollection();
+      String shard = sourceReplica.getShard();
+      String replicaName = sourceReplica.getName();
       DocCollection collection = state.getCollection(coll);
       Slice slice = collection.getSlice(shard);
       if (slice.getReplicas().size() < 2) {
         // can't delete the only replica in existence
-        res.add(
-            coll
-                + "/"
-                + shard
-                + "/"
-                + replicaName
-                + ", type="
-                + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+        res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
       } else { // check replica types
         int otherNonPullReplicas = 0;
         for (Replica r : slice.getReplicas()) {
@@ -96,72 +76,10 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
         }
         // can't delete - there are no other non-pull replicas
         if (otherNonPullReplicas == 0) {
-          res.add(
-              coll
-                  + "/"
-                  + shard
-                  + "/"
-                  + replicaName
-                  + ", type="
-                  + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+          res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
         }
       }
     }
     return res;
   }
-
-  static void cleanupReplicas(
-      NamedList<Object> results,
-      ClusterState clusterState,
-      List<ZkNodeProps> sourceReplicas,
-      CollectionCommandContext ccc,
-      String node,
-      String async)
-      throws IOException, InterruptedException {
-    CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String coll = sourceReplica.getStr(COLLECTION_PROP);
-      String shard = sourceReplica.getStr(SHARD_ID_PROP);
-      String type = sourceReplica.getStr(ZkStateReader.REPLICA_TYPE);
-      log.info(
-          "Deleting replica type={} for collection={} shard={} on node={}",
-          type,
-          coll,
-          shard,
-          node);
-      NamedList<Object> deleteResult = new NamedList<>();
-      try {
-        if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
-        new DeleteReplicaCmd(ccc)
-            .deleteReplica(
-                clusterState,
-                sourceReplica.plus("parallel", "true"),
-                deleteResult,
-                () -> {
-                  cleanupLatch.countDown();
-                  if (deleteResult.get("failure") != null) {
-                    synchronized (results) {
-                      results.add(
-                          "failure",
-                          String.format(
-                              Locale.ROOT,
-                              "Failed to delete replica for collection=%s shard=%s" + " on node=%s",
-                              coll,
-                              shard,
-                              node));
-                    }
-                  }
-                });
-      } catch (KeeperException e) {
-        log.warn("Error deleting ", e);
-        cleanupLatch.countDown();
-      } catch (Exception e) {
-        log.warn("Error deleting ", e);
-        cleanupLatch.countDown();
-        throw e;
-      }
-    }
-    log.debug("Waiting for delete node action to complete");
-    cleanupLatch.await(5, TimeUnit.MINUTES);
-  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 52cba5b0fc1..fe47f96cc6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -17,41 +17,25 @@
 
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 
-import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import org.apache.solr.cloud.ActiveReplicaWatcher;
-import org.apache.solr.common.SolrCloseableLatch;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.CollectionUtil;
 import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final CollectionCommandContext ccc;
 
@@ -89,41 +73,20 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
               + source
               + " are live, therefore replicas cannot be moved");
     }
-    List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-    // how many leaders are we moving? for these replicas we have to make sure that either:
-    // * another existing replica can become a leader, or
-    // * we wait until the newly created replica completes recovery (and can become the new leader)
-    // If waitForFinalState=true we wait for all replicas
-    int numLeaders = 0;
-    for (ZkNodeProps props : sourceReplicas) {
-      if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-        numLeaders++;
-      }
-    }
-    // map of collectionName_coreNodeName to watchers
-    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
-    List<ZkNodeProps> createdReplicas = new ArrayList<>();
-
-    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
-    SolrCloseableLatch countDownLatch =
-        new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+    List<Replica> sourceReplicas = ReplicaMigrationUtils.getReplicasOfNode(source, clusterState);
+    Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size());
 
-    SolrCloseableLatch replicasToRecover =
-        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
-
-    List<ReplicaPosition> replicaPositions = null;
     if (target == null || target.isEmpty()) {
       List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
-      for (ZkNodeProps sourceReplica : sourceReplicas) {
-        Replica.Type replicaType =
-            Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+      for (Replica sourceReplica : sourceReplicas) {
+        Replica.Type replicaType = sourceReplica.getType();
         int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
         int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
         int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
         Assign.AssignRequest assignRequest =
             new Assign.AssignRequestBuilder()
-                .forCollection(sourceReplica.getStr(COLLECTION_PROP))
-                .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
+                .forCollection(sourceReplica.getCollection())
+                .forShard(Collections.singletonList(sourceReplica.getShard()))
                 .assignNrtReplicas(numNrtReplicas)
                 .assignTlogReplicas(numTlogReplicas)
                 .assignPullReplicas(numPullReplicas)
@@ -135,194 +98,25 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
         assignRequests.add(assignRequest);
       }
       Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
-      replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
-    }
-    int replicaPositionIdx = 0;
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Going to create replica for collection={} shard={} on node={}",
-            sourceCollection,
-            sourceReplica.getStr(SHARD_ID_PROP),
-            target);
+      List<ReplicaPosition> replicaPositions =
+          assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+      int position = 0;
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, replicaPositions.get(position++).node);
       }
-      String targetNode;
-      // Use the assigned replica positions, if target is null or empty (checked above)
-      if (replicaPositions != null) {
-        targetNode = replicaPositions.get(replicaPositionIdx).node;
-        replicaPositionIdx++;
-      } else {
-        targetNode = target;
-      }
-      ZkNodeProps msg =
-          sourceReplica
-              .plus("parallel", String.valueOf(parallel))
-              .plus(CoreAdminParams.NODE, targetNode);
-      if (async != null) msg.getProperties().put(ASYNC, async);
-      NamedList<Object> nl = new NamedList<>();
-      final ZkNodeProps addedReplica =
-          new AddReplicaCmd(ccc)
-              .addReplica(
-                  clusterState,
-                  msg,
-                  nl,
-                  () -> {
-                    countDownLatch.countDown();
-                    if (nl.get("failure") != null) {
-                      String errorString =
-                          String.format(
-                              Locale.ROOT,
-                              "Failed to create replica for collection=%s shard=%s" + " on node=%s",
-                              sourceCollection,
-                              sourceReplica.getStr(SHARD_ID_PROP),
-                              target);
-                      log.warn(errorString);
-                      // one replica creation failed. Make the best attempt to
-                      // delete all the replicas created so far in the target
-                      // and exit
-                      synchronized (results) {
-                        results.add("failure", errorString);
-                        anyOneFailed.set(true);
-                      }
-                    } else {
-                      if (log.isDebugEnabled()) {
-                        log.debug(
-                            "Successfully created replica for collection={} shard={} on node={}",
-                            sourceCollection,
-                            sourceReplica.getStr(SHARD_ID_PROP),
-                            target);
-                      }
-                    }
-                  })
-              .get(0);
-
-      if (addedReplica != null) {
-        createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceCollection;
-          String key = collectionName + "_" + replicaName;
-          CollectionStateWatcher watcher;
-          if (waitForFinalState) {
-            watcher =
-                new ActiveReplicaWatcher(
-                    collectionName,
-                    null,
-                    Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
-                    replicasToRecover);
-          } else {
-            watcher =
-                new LeaderRecoveryWatcher(
-                    collectionName,
-                    shardName,
-                    replicaName,
-                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
-                    replicasToRecover);
-          }
-          watchers.put(key, watcher);
-          log.debug("--- adding {}, {}", key, watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
-        } else {
-          log.debug("--- not waiting for {}", addedReplica);
-        }
-      }
-    }
-
-    log.debug("Waiting for replicas to be added");
-    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for replicas to be added");
-      anyOneFailed.set(true);
     } else {
-      log.debug("Finished waiting for replicas to be added");
-    }
-
-    // now wait for leader replicas to recover
-    log.debug("Waiting for {} leader replicas to recover", numLeaders);
-    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount());
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, target);
       }
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for leader replicas to recover");
     }
-    // remove the watchers, we're done either way
-    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
-    }
-    if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on target node");
-      SolrCloseableLatch cleanupLatch =
-          new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
-      for (ZkNodeProps createdReplica : createdReplicas) {
-        NamedList<Object> deleteResult = new NamedList<>();
-        try {
-          new DeleteReplicaCmd(ccc)
-              .deleteReplica(
-                  zkStateReader.getClusterState(),
-                  createdReplica.plus("parallel", "true"),
-                  deleteResult,
-                  () -> {
-                    cleanupLatch.countDown();
-                    if (deleteResult.get("failure") != null) {
-                      synchronized (results) {
-                        results.add(
-                            "failure",
-                            "Could not cleanup, because of : " + deleteResult.get("failure"));
-                      }
-                    }
-                  });
-        } catch (KeeperException e) {
-          cleanupLatch.countDown();
-          log.warn("Error deleting replica ", e);
-        } catch (Exception e) {
-          log.warn("Error deleting replica ", e);
-          cleanupLatch.countDown();
-          throw e;
-        }
-      }
-      cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
-    }
-
-    // we have reached this far means all replicas could be recreated
-    // now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async);
-    results.add(
-        "success",
-        "REPLACENODE action completed successfully from  : " + source + " to : " + target);
-  }
 
-  static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
-    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
-    for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
-      for (Slice slice : e.getValue().getSlices()) {
-        for (Replica replica : slice.getReplicas()) {
-          if (source.equals(replica.getNodeName())) {
-            ZkNodeProps props =
-                new ZkNodeProps(
-                    COLLECTION_PROP,
-                    e.getKey(),
-                    SHARD_ID_PROP,
-                    slice.getName(),
-                    ZkStateReader.CORE_NAME_PROP,
-                    replica.getCoreName(),
-                    ZkStateReader.REPLICA_PROP,
-                    replica.getName(),
-                    ZkStateReader.REPLICA_TYPE,
-                    replica.getType().name(),
-                    ZkStateReader.LEADER_PROP,
-                    String.valueOf(replica.equals(slice.getLeader())),
-                    CoreAdminParams.NODE,
-                    source);
-            sourceReplicas.add(props);
-          }
-        }
-      }
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "REPLACENODE action completed successfully from  : " + source + " to : " + target);
     }
-    return sourceReplicas;
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
similarity index 54%
copy from solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
copy to solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
index 52cba5b0fc1..903d509da18 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
@@ -17,12 +17,12 @@
 
 package org.apache.solr.cloud.api.collections;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -30,73 +30,56 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 import org.apache.solr.cloud.ActiveReplicaWatcher;
 import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
+public class ReplicaMigrationUtils {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final CollectionCommandContext ccc;
-
-  public ReplaceNodeCmd(CollectionCommandContext ccc) {
-    this.ccc = ccc;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
-      throws Exception {
-    ZkStateReader zkStateReader = ccc.getZkStateReader();
-    String source = message.getStr(CollectionParams.SOURCE_NODE);
-    String target = message.getStr(CollectionParams.TARGET_NODE);
-    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
-    if (source == null) {
-      throw new SolrException(
-          SolrException.ErrorCode.BAD_REQUEST, "sourceNode is a required param");
-    }
-    String async = message.getStr(ASYNC);
-    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
-    boolean parallel = message.getBool("parallel", false);
-    ClusterState clusterState = zkStateReader.getClusterState();
-
-    if (!clusterState.liveNodesContain(source)) {
-      throw new SolrException(
-          SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
-    }
-    if (target != null && !clusterState.liveNodesContain(target)) {
-      throw new SolrException(
-          SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
-    } else if (clusterState.getLiveNodes().size() <= 1) {
-      throw new SolrException(
-          SolrException.ErrorCode.BAD_REQUEST,
-          "No nodes other than the source node: "
-              + source
-              + " are live, therefore replicas cannot be moved");
-    }
-    List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls this method
+   * @param movements a map from replica to the new node that the replica should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
     // how many leaders are we moving? for these replicas we have to make sure that either:
     // * another existing replica can become a leader, or
     // * we wait until the newly created replica completes recovery (and can become the new leader)
     // If waitForFinalState=true we wait for all replicas
     int numLeaders = 0;
-    for (ZkNodeProps props : sourceReplicas) {
-      if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
         numLeaders++;
       }
     }
@@ -106,60 +89,31 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
 
     AtomicBoolean anyOneFailed = new AtomicBoolean(false);
     SolrCloseableLatch countDownLatch =
-        new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
 
     SolrCloseableLatch replicasToRecover =
         new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
 
-    List<ReplicaPosition> replicaPositions = null;
-    if (target == null || target.isEmpty()) {
-      List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
-      for (ZkNodeProps sourceReplica : sourceReplicas) {
-        Replica.Type replicaType =
-            Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
-        int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
-        int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
-        int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
-        Assign.AssignRequest assignRequest =
-            new Assign.AssignRequestBuilder()
-                .forCollection(sourceReplica.getStr(COLLECTION_PROP))
-                .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
-                .assignNrtReplicas(numNrtReplicas)
-                .assignTlogReplicas(numTlogReplicas)
-                .assignPullReplicas(numPullReplicas)
-                .onNodes(
-                    ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes().stream()
-                        .filter(node -> !node.equals(source))
-                        .collect(Collectors.toList()))
-                .build();
-        assignRequests.add(assignRequest);
-      }
-      Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
-      replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
-    }
-    int replicaPositionIdx = 0;
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
       if (log.isInfoEnabled()) {
         log.info(
             "Going to create replica for collection={} shard={} on node={}",
             sourceCollection,
-            sourceReplica.getStr(SHARD_ID_PROP),
-            target);
-      }
-      String targetNode;
-      // Use the assigned replica positions, if target is null or empty (checked above)
-      if (replicaPositions != null) {
-        targetNode = replicaPositions.get(replicaPositionIdx).node;
-        replicaPositionIdx++;
-      } else {
-        targetNode = target;
+            sourceReplica.getShard(),
+            targetNode);
       }
+
       ZkNodeProps msg =
           sourceReplica
+              .toFullProps()
               .plus("parallel", String.valueOf(parallel))
               .plus(CoreAdminParams.NODE, targetNode);
-      if (async != null) msg.getProperties().put(ASYNC, async);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
       NamedList<Object> nl = new NamedList<>();
       final ZkNodeProps addedReplica =
           new AddReplicaCmd(ccc)
@@ -173,10 +127,10 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
                       String errorString =
                           String.format(
                               Locale.ROOT,
-                              "Failed to create replica for collection=%s shard=%s" + " on node=%s",
+                              "Failed to create replica for collection=%s shard=%s on node=%s",
                               sourceCollection,
-                              sourceReplica.getStr(SHARD_ID_PROP),
-                              target);
+                              sourceReplica.getShard(),
+                              targetNode);
                       log.warn(errorString);
                       // one replica creation failed. Make the best attempt to
                       // delete all the replicas created so far in the target
@@ -190,8 +144,8 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
                         log.debug(
                             "Successfully created replica for collection={} shard={} on node={}",
                             sourceCollection,
-                            sourceReplica.getStr(SHARD_ID_PROP),
-                            target);
+                            sourceReplica.getShard(),
+                            targetNode);
                       }
                     }
                   })
@@ -199,23 +153,22 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
 
       if (addedReplica != null) {
         createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceCollection;
-          String key = collectionName + "_" + replicaName;
+        if (sourceReplica.isLeader() || waitForFinalState) {
+          String shardName = sourceReplica.getShard();
+          String replicaName = sourceReplica.getName();
+          String key = sourceCollection + "_" + replicaName;
           CollectionStateWatcher watcher;
           if (waitForFinalState) {
             watcher =
                 new ActiveReplicaWatcher(
-                    collectionName,
+                    sourceCollection,
                     null,
                     Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
                     replicasToRecover);
           } else {
             watcher =
                 new LeaderRecoveryWatcher(
-                    collectionName,
+                    sourceCollection,
                     shardName,
                     replicaName,
                     addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
@@ -223,7 +176,7 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
           }
           watchers.put(key, watcher);
           log.debug("--- adding {}, {}", key, watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+          ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher);
         } else {
           log.debug("--- not waiting for {}", addedReplica);
         }
@@ -251,10 +204,10 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
     }
     // remove the watchers, we're done either way
     for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
+      ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue());
     }
     if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on target node");
+      log.info("Failed to create some replicas. Cleaning up all newly created replicas.");
       SolrCloseableLatch cleanupLatch =
           new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
       for (ZkNodeProps createdReplica : createdReplicas) {
@@ -262,7 +215,7 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
         try {
           new DeleteReplicaCmd(ccc)
               .deleteReplica(
-                  zkStateReader.getClusterState(),
+                  ccc.getZkStateReader().getClusterState(),
                   createdReplica.plus("parallel", "true"),
                   deleteResult,
                   () -> {
@@ -285,40 +238,79 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
         }
       }
       cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
+      return false;
     }
 
-    // we have reached this far means all replicas could be recreated
-    // now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async);
-    results.add(
-        "success",
-        "REPLACENODE action completed successfully from  : " + source + " to : " + target);
+    // we have reached this far, meaning all replicas should have been recreated.
+    // now cleanup the original replicas
+    return cleanupReplicas(
+        results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId);
+  }
+
+  static boolean cleanupReplicas(
+      NamedList<Object> results,
+      ClusterState clusterState,
+      Collection<Replica> sourceReplicas,
+      CollectionCommandContext ccc,
+      String async)
+      throws IOException, InterruptedException {
+    SolrCloseableLatch cleanupLatch =
+        new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+    for (Replica sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getCollection();
+      String shard = sourceReplica.getShard();
+      String type = sourceReplica.getType().toString();
+      String node = sourceReplica.getNodeName();
+      log.info(
+          "Deleting replica type={} for collection={} shard={} on node={}",
+          type,
+          coll,
+          shard,
+          node);
+      NamedList<Object> deleteResult = new NamedList<>();
+      try {
+        ZkNodeProps cmdMessage = sourceReplica.toFullProps();
+        if (async != null) cmdMessage = cmdMessage.plus(ASYNC, async);
+        new DeleteReplicaCmd(ccc)
+            .deleteReplica(
+                clusterState,
+                cmdMessage.plus("parallel", "true"),
+                deleteResult,
+                () -> {
+                  cleanupLatch.countDown();
+                  if (deleteResult.get("failure") != null) {
+                    synchronized (results) {
+                      results.add(
+                          "failure",
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to delete replica for collection=%s shard=%s on node=%s",
+                              coll,
+                              shard,
+                              node));
+                    }
+                  }
+                });
+      } catch (KeeperException e) {
+        log.warn("Error deleting ", e);
+        cleanupLatch.countDown();
+      } catch (Exception e) {
+        log.warn("Error deleting ", e);
+        cleanupLatch.countDown();
+        throw e;
+      }
+    }
+    log.debug("Waiting for delete node action to complete");
+    return cleanupLatch.await(5, TimeUnit.MINUTES);
   }
 
-  static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
-    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+  static List<Replica> getReplicasOfNode(String nodeName, ClusterState state) {
+    List<Replica> sourceReplicas = new ArrayList<>();
     for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
       for (Slice slice : e.getValue().getSlices()) {
         for (Replica replica : slice.getReplicas()) {
-          if (source.equals(replica.getNodeName())) {
-            ZkNodeProps props =
-                new ZkNodeProps(
-                    COLLECTION_PROP,
-                    e.getKey(),
-                    SHARD_ID_PROP,
-                    slice.getName(),
-                    ZkStateReader.CORE_NAME_PROP,
-                    replica.getCoreName(),
-                    ZkStateReader.REPLICA_PROP,
-                    replica.getName(),
-                    ZkStateReader.REPLICA_TYPE,
-                    replica.getType().name(),
-                    ZkStateReader.LEADER_PROP,
-                    String.valueOf(replica.equals(slice.getLeader())),
-                    CoreAdminParams.NODE,
-                    source);
-            sourceReplicas.add(props);
+          if (nodeName.equals(replica.getNodeName())) {
+            sourceReplicas.add(replica);
           }
         }
       }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java
new file mode 100644
index 00000000000..e543004fd9e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+
+/**
+ * A fully specified plan or instructions for replica balancing to be applied to the cluster.
+ *
+ * <p>Fully specified means the actual {@link Node}'s on which to place replicas have been decided.
+ *
+ * <p>Instances are created by plugin code using {@link BalancePlanFactory}. This interface
+ * obviously doesn't expose much but the underlying Solr side implementation has all that is needed
+ * (and will do at least one cast in order to execute the plan, likely then using some type of
+ * visitor pattern).
+ */
+public interface BalancePlan {
+  /**
+   * @return the {@link BalanceRequest} at the origin of this {@link BalancePlan}, as passed to the
+   *     {@link BalancePlanFactory} method that created this instance.
+   */
+  BalanceRequest getRequest();
+
+  /**
+   * @return the map of {@link Replica}'s to the {@link ReplicaPlacement}'s that describe where the
+   *     replica should be placed.
+   */
+  Map<Replica, Node> getReplicaMovements();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java
new file mode 100644
index 00000000000..d395df40870
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+
+/**
+ * Allows plugins to create {@link BalancePlan}s telling the Solr layer how to balance replicas
+ * following the processing of a {@link BalanceRequest}. The Solr layer can (and will) check that
+ * the {@link BalancePlan} conforms to the {@link BalanceRequest} (and if it does not, the requested
+ * operation will fail).
+ */
+public interface BalancePlanFactory {
+  /**
+   * Creates a {@link BalancePlan} for balancing replicas across the given nodes.
+   *
+   * <p>This is in support (directly or indirectly) of {@link
+   * org.apache.solr.cloud.api.collections.BalanceReplicasCmd}}.
+   */
+  BalancePlan createBalancePlan(BalanceRequest request, Map<Replica, Node> replicaMovements);
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java
new file mode 100644
index 00000000000..f967b7fa808
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Set;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+
+/**
+ * A cluster related placement request that Solr asks a {@link PlacementPlugin} to resolve and
+ * compute replica balancing plan for replicas that already exist across a set of Nodes.
+ *
+ * <p>The set of {@link Node}s on which the replicas should be balanced across is specified
+ * (defaults to being equal to the set returned by {@link Cluster#getLiveDataNodes()}).
+ */
+public interface BalanceRequest extends ModificationRequest {
+
+  /**
+   * Replicas should only be balanced on nodes in the set returned by this method.
+   *
+   * <p>When Collection API calls do not specify a specific set of nodes, replicas can be balanced
+   * on all live nodes in the cluster. In such cases, this set will be equal to the set of all live
+   * nodes. The plugin placement code does not need to worry (or care) if a set of nodes was
+   * explicitly specified or not.
+   *
+   * @return never {@code null} and never empty set (if that set was to be empty for any reason, no
+   *     balance would be possible and the Solr infrastructure driving the plugin code would detect
+   *     the error itself rather than calling the plugin).
+   */
+  Set<Node> getNodes();
+
+  int getMaximumBalanceSkew();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
index 4ad577ea0b5..be7b0282a79 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
@@ -39,4 +39,7 @@ public interface PlacementContext {
 
   /** Factory used to create instances of {@link PlacementPlan} to return computed decision. */
   PlacementPlanFactory getPlacementPlanFactory();
+
+  /** Factory used to create instances of {@link BalancePlan} to return computed decision. */
+  BalancePlanFactory getBalancePlanFactory();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
index a2b7c3c01c8..0739996e118 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
@@ -74,6 +74,20 @@ public interface PlacementPlugin {
       Collection<PlacementRequest> placementRequests, PlacementContext placementContext)
       throws PlacementException, InterruptedException;
 
+  /**
+   * Request from plugin code to compute a balancing of replicas. Note this method must be reentrant
+   * as a plugin instance may (read will) get multiple such calls in parallel.
+   *
+   * <p>Configuration is passed upon creation of a new instance of this class by {@link
+   * PlacementPluginFactory#createPluginInstance}.
+   *
+   * @param balanceRequest request for selecting replicas that should be moved to aid in balancing
+   *     the replicas across the desired nodes.
+   * @return plan satisfying all extraction requests.
+   */
+  BalancePlan computeBalancing(BalanceRequest balanceRequest, PlacementContext placementContext)
+      throws PlacementException, InterruptedException;
+
   /**
    * Verify that a collection layout modification doesn't violate constraints on replica placements
    * required by this plugin. Default implementation is a no-op (any modifications are allowed).
@@ -85,5 +99,5 @@ public interface PlacementPlugin {
    */
   default void verifyAllowedModification(
       ModificationRequest modificationRequest, PlacementContext placementContext)
-      throws PlacementModificationException, InterruptedException {}
+      throws PlacementException, InterruptedException {}
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java
new file mode 100644
index 00000000000..05ba636e34d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalancePlanFactory;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+/** Simple implementation of {@link BalancePlanFactory}. */
+public class BalancePlanFactoryImpl implements BalancePlanFactory {
+  @Override
+  public BalancePlan createBalancePlan(
+      BalanceRequest request, Map<Replica, Node> replicaMovements) {
+    return new BalancePlanImpl(request, replicaMovements);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java
new file mode 100644
index 00000000000..7bf75a891dc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+class BalancePlanImpl implements BalancePlan {
+
+  final BalanceRequest request;
+  final Map<Replica, Node> replicaMovements;
+
+  BalancePlanImpl(BalanceRequest request, Map<Replica, Node> replicaMovements) {
+    this.request = request;
+    this.replicaMovements = replicaMovements;
+  }
+
+  @Override
+  public BalanceRequest getRequest() {
+    return request;
+  }
+
+  @Override
+  public Map<Replica, Node> getReplicaMovements() {
+    return replicaMovements;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("BalancePlan{");
+    for (Map.Entry<Replica, Node> movement : replicaMovements.entrySet()) {
+      sb.append("\n")
+          .append(movement.getKey().getReplicaName())
+          .append(" : ")
+          .append(movement.getValue().getName());
+    }
+    sb.append("\n}");
+    return sb.toString();
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java
new file mode 100644
index 00000000000..95454f9dbbe
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Set;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+public class BalanceRequestImpl implements BalanceRequest {
+
+  private final SolrCollection solrCollection;
+  private final Set<Node> nodes;
+  private final int maximumBalanceSkew;
+
+  public BalanceRequestImpl(Set<Node> nodes) {
+    this(nodes, -1);
+  }
+
+  public BalanceRequestImpl(Set<Node> nodes, int maximumBalanceSkew) {
+    this.nodes = nodes;
+    this.maximumBalanceSkew = maximumBalanceSkew;
+    this.solrCollection = null;
+  }
+
+  @Override
+  public Set<Node> getNodes() {
+    return nodes;
+  }
+
+  @Override
+  public int getMaximumBalanceSkew() {
+    return maximumBalanceSkew;
+  }
+
+  @Override
+  public SolrCollection getCollection() {
+    return solrCollection;
+  }
+
+  /** Returns a {@link BalanceRequest} that can be consumed by a plugin to balance replicas */
+  static BalanceRequestImpl create(Cluster cluster, Set<String> nodeNames, int maximumBalanceSkew)
+      throws Assign.AssignmentException {
+    final Set<Node> nodes;
+    // If no nodes specified, use all live data nodes. If nodes are specified, use specified list.
+    if (nodeNames != null && !nodeNames.isEmpty()) {
+      if (nodeNames.size() == 1) {
+        throw new Assign.AssignmentException(
+            "Bad balance request: cannot balance across a single node: "
+                + nodeNames.stream().findAny().get());
+      }
+      nodes = SimpleClusterAbstractionsImpl.NodeImpl.getNodes(nodeNames);
+
+      for (Node n : nodes) {
+        if (!cluster.getLiveDataNodes().contains(n)) {
+          throw new Assign.AssignmentException(
+              "Bad balance request: specified node is a non-data hosting node:" + n.getName());
+        }
+      }
+    } else {
+      nodes = cluster.getLiveDataNodes();
+      if (nodes.isEmpty()) {
+        throw new Assign.AssignmentException("Impossible balance request: no live data nodes");
+      }
+    }
+
+    return new BalanceRequestImpl(nodes, maximumBalanceSkew);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
index d7d94644461..5fab7b2fe9c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
@@ -18,13 +18,16 @@
 package org.apache.solr.cluster.placement.impl;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.placement.BalanceRequest;
 import org.apache.solr.cluster.placement.DeleteCollectionRequest;
 import org.apache.solr.cluster.placement.DeleteReplicasRequest;
 import org.apache.solr.cluster.placement.PlacementContext;
@@ -35,10 +38,16 @@ import org.apache.solr.cluster.placement.PlacementRequest;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.CollectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** This assign strategy delegates placement computation to "plugin" code. */
 public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
 
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private final PlacementPlugin plugin;
 
   public PlacementPluginAssignStrategy(PlacementPlugin plugin) {
@@ -88,6 +97,54 @@ public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
     return replicaPositions;
   }
 
+  @Override
+  public Map<Replica, String> computeReplicaBalancing(
+      SolrCloudManager solrCloudManager, Set<String> nodes, int maxBalanceSkew)
+      throws Assign.AssignmentException, IOException, InterruptedException {
+    PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager);
+
+    BalanceRequest balanceRequest =
+        BalanceRequestImpl.create(placementContext.getCluster(), nodes, maxBalanceSkew);
+    try {
+      Map<org.apache.solr.cluster.Replica, Node> rawReplicaMovements =
+          plugin.computeBalancing(balanceRequest, placementContext).getReplicaMovements();
+      Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(rawReplicaMovements.size());
+      for (Map.Entry<org.apache.solr.cluster.Replica, Node> movement :
+          rawReplicaMovements.entrySet()) {
+        Replica converted = findReplica(solrCloudManager, movement.getKey());
+        if (converted == null) {
+          throw new Assign.AssignmentException(
+              "Could not find replica when balancing: " + movement.getKey().toString());
+        }
+        replicaMovements.put(converted, movement.getValue().getName());
+      }
+      return replicaMovements;
+    } catch (PlacementException pe) {
+      throw new Assign.AssignmentException(pe);
+    }
+  }
+
+  private Replica findReplica(
+      SolrCloudManager solrCloudManager, org.apache.solr.cluster.Replica replica) {
+    DocCollection collection = null;
+    try {
+      collection =
+          solrCloudManager
+              .getClusterState()
+              .getCollection(replica.getShard().getCollection().getName());
+    } catch (IOException e) {
+      throw new Assign.AssignmentException(
+          "Could not load cluster state when balancing replicas", e);
+    }
+    if (collection != null) {
+      Slice slice = collection.getSlice(replica.getShard().getShardName());
+      if (slice != null) {
+        return slice.getReplica(replica.getReplicaName());
+      }
+    }
+    return null;
+  }
+
   @Override
   public void verifyDeleteCollection(SolrCloudManager solrCloudManager, DocCollection collection)
       throws Assign.AssignmentException, IOException, InterruptedException {
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
index 0e138568492..6c1cae4bc67 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.cluster.Cluster;
 import org.apache.solr.cluster.placement.AttributeFetcher;
+import org.apache.solr.cluster.placement.BalancePlanFactory;
 import org.apache.solr.cluster.placement.PlacementContext;
 import org.apache.solr.cluster.placement.PlacementPlanFactory;
 
@@ -32,6 +33,7 @@ public class SimplePlacementContextImpl implements PlacementContext {
   private final Cluster cluster;
   private final AttributeFetcher attributeFetcher;
   private final PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
+  private final BalancePlanFactory balancePlanFactory = new BalancePlanFactoryImpl();
 
   public SimplePlacementContextImpl(SolrCloudManager solrCloudManager) throws IOException {
     cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager);
@@ -52,4 +54,9 @@ public class SimplePlacementContextImpl implements PlacementContext {
   public PlacementPlanFactory getPlacementPlanFactory() {
     return placementPlanFactory;
   }
+
+  @Override
+  public BalancePlanFactory getBalancePlanFactory() {
+    return balancePlanFactory;
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
index 076fa4a02e6..80d2d43af96 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -19,49 +19,37 @@ package org.apache.solr.cluster.placement.plugins;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.solr.cluster.Cluster;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.Shard;
 import org.apache.solr.cluster.SolrCollection;
 import org.apache.solr.cluster.placement.AttributeFetcher;
 import org.apache.solr.cluster.placement.AttributeValues;
+import org.apache.solr.cluster.placement.BalanceRequest;
 import org.apache.solr.cluster.placement.DeleteCollectionRequest;
-import org.apache.solr.cluster.placement.DeleteReplicasRequest;
-import org.apache.solr.cluster.placement.DeleteShardsRequest;
-import org.apache.solr.cluster.placement.ModificationRequest;
 import org.apache.solr.cluster.placement.PlacementContext;
 import org.apache.solr.cluster.placement.PlacementException;
 import org.apache.solr.cluster.placement.PlacementModificationException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
 import org.apache.solr.cluster.placement.PlacementPlugin;
 import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.ReplicaMetric;
+import org.apache.solr.cluster.placement.ShardMetrics;
 import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
+import org.apache.solr.cluster.placement.impl.ReplicaMetricImpl;
+import org.apache.solr.common.util.CollectionUtil;
 import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.SuppressForbidden;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,42 +97,10 @@ import org.slf4j.LoggerFactory;
  * prop), and avoid having more than one replica per shard on the same node.<br>
  * Only after these constraints are satisfied do minimize cores per node or disk usage.</i>
  *
- * <p>Overall strategy of this plugin:
- *
- * <ul>
- *   <li>The set of nodes in the cluster is obtained and transformed into 3 independent sets (that
- *       can overlap) of nodes accepting each of the three replica types.
- *   <li>For each shard on which placing replicas is required and then for each replica type to
- *       place (starting with NRT, then TLOG then PULL):
- *       <ul>
- *         <li>The set of candidates nodes corresponding to the replica type is used and from that
- *             set are removed nodes that already have a replica (of any type) for that shard
- *         <li>If there are not enough nodes, an error is thrown (this is checked further down
- *             during processing).
- *         <li>The number of (already existing) replicas of the current type on each Availability
- *             Zone is collected.
- *         <li>Separate the set of available nodes to as many subsets (possibly some are empty) as
- *             there are Availability Zones defined for the candidate nodes
- *         <li>In each AZ nodes subset, sort the nodes by increasing total number of cores count,
- *             with possibly a condition that pushes nodes with low disk space to the end of the
- *             list? Or a weighted combination of the relative importance of these two factors? Some
- *             randomization? Marking as non available nodes with not enough disk space? These and
- *             other are likely aspects to be played with once the plugin is tested or observed to
- *             be running in prod, don't expect the initial code drop(s) to do all of that.
- *         <li>Iterate over the number of replicas to place (for the current replica type for the
- *             current shard):
- *             <ul>
- *               <li>Based on the number of replicas per AZ collected previously, pick the non empty
- *                   set of nodes having the lowest number of replicas. Then pick the first node in
- *                   that set. That's the node the replica is placed one. Remove the node from the
- *                   set of available nodes for the given AZ and increase the number of replicas
- *                   placed on that AZ.
- *             </ul>
- *         <li>During this process, the number of cores on the nodes in general is tracked to take
- *             into account placement decisions so that not all shards decide to put their replicas
- *             on the same nodes (they might though if these are the less loaded nodes).
- *       </ul>
- * </ul>
+ * <p>This plugin achieves this by creating a {@link AffinityPlacementPlugin.AffinityNode} that
+ * weights nodes very high if they are unbalanced with respect to AvailabilityZone and SpreadDomain.
+ * See {@link AffinityPlacementPlugin.AffinityNode} for more information on how this weighting helps
+ * the plugin correctly place and balance replicas.
  *
  * <p>This code is a realistic placement computation, based on a few assumptions. The code is
  * written in such a way to make it relatively easy to adapt it to (somewhat) different assumptions.
@@ -190,7 +146,7 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
    * See {@link AffinityPlacementFactory} for instructions on how to configure a cluster to use this
    * plugin and details on what the plugin does.
    */
-  static class AffinityPlacementPlugin implements PlacementPlugin {
+  static class AffinityPlacementPlugin extends OrderedNodePlacementPlugin {
 
     private final long minimalFreeDiskGB;
 
@@ -199,13 +155,10 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
     // primary to secondary (1:1)
     private final Map<String, String> withCollections;
     // secondary to primary (1:N)
-    private final Map<String, Set<String>> colocatedWith;
+    private final Map<String, Set<String>> collocatedWith;
 
     private final Map<String, Set<String>> nodeTypes;
 
-    private final Random replicaPlacementRandom =
-        new Random(); // ok even if random sequence is predictable.
-
     private final boolean spreadAcrossDomains;
 
     /**
@@ -225,12 +178,12 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
       this.spreadAcrossDomains = spreadAcrossDomains;
       this.withCollections = withCollections;
       if (withCollections.isEmpty()) {
-        colocatedWith = Map.of();
+        collocatedWith = Map.of();
       } else {
-        colocatedWith = new HashMap<>();
+        collocatedWith = new HashMap<>();
         withCollections.forEach(
             (primary, secondary) ->
-                colocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary));
+                collocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary));
       }
 
       if (collectionNodeTypes.isEmpty()) {
@@ -245,175 +198,21 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
               }
             });
       }
-
-      // We make things reproducible in tests by using test seed if any
-      String seed = System.getProperty("tests.seed");
-      if (seed != null) {
-        replicaPlacementRandom.setSeed(seed.hashCode());
-      }
-    }
-
-    @Override
-    @SuppressForbidden(
-        reason =
-            "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
-    public List<PlacementPlan> computePlacements(
-        Collection<PlacementRequest> requests, PlacementContext placementContext)
-        throws PlacementException {
-      List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
-      Set<Node> allNodes = new HashSet<>();
-      for (PlacementRequest request : requests) {
-        allNodes.addAll(request.getTargetNodes());
-      }
-
-      // Fetch attributes for a superset of all nodes requested amongst the placementRequests
-      AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
-      attributeFetcher
-          .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
-          .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
-          .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-          .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
-      attributeFetcher
-          .requestNodeMetric(NodeMetricImpl.NUM_CORES)
-          .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
-      attributeFetcher.fetchFrom(allNodes);
-      final AttributeValues attrValues = attributeFetcher.fetchAttributes();
-      // Get the number of currently existing cores per node, so we can update as we place new cores
-      // to not end up always selecting the same node(s). This is used across placement requests
-      Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, attrValues);
-
-      boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, attrValues);
-
-      // Keep track with nodesWithReplicas across requests
-      Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new HashMap<>();
-      for (PlacementRequest request : requests) {
-        Set<Node> nodes = request.getTargetNodes();
-        SolrCollection solrCollection = request.getCollection();
-
-        // filter out nodes that don't meet the `withCollection` constraint
-        nodes =
-            filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes);
-        // filter out nodes that don't match the "node types" specified in the collection props
-        nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes);
-
-        // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap
-        // if nodes accept multiple replica types). These subsets sets are actually maps, because we
-        // capture the number of cores (of any replica type) present on each node.
-        EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
-            getAvailableNodesForReplicaTypes(nodes, attrValues);
-
-        // All available zones of live nodes. Due to some nodes not being candidates for placement,
-        // and some existing replicas being one availability zones that might be offline (i.e. their
-        // nodes are not live), this set might contain zones on which it is impossible to place
-        // replicas. That's ok.
-        Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
-
-        // Build the replica placement decisions here
-        Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
-
-        // Let's now iterate on all shards to create replicas for and start finding home sweet homes
-        // for the replicas
-        for (String shardName : request.getShardNames()) {
-          // Inventory nodes (if any) that already have a replica of any type for the shard, because
-          // we can't be placing additional replicas on these. This data structure is updated after
-          // each replica to node assign and is used to make sure different replica types are not
-          // allocated to the same nodes (protecting same node assignments within a given replica
-          // type is done "by construction" in makePlacementDecisions()).
-          Set<Node> nodesWithReplicas =
-              allNodesWithReplicas
-                  .computeIfAbsent(solrCollection.getName(), col -> new HashMap<>())
-                  .computeIfAbsent(
-                      shardName,
-                      s -> {
-                        Set<Node> newNodeSet = new HashSet<>();
-                        Shard shard = solrCollection.getShard(s);
-                        if (shard != null) {
-                          // Prefill the set with the existing replicas
-                          for (Replica r : shard.replicas()) {
-                            newNodeSet.add(r.getNode());
-                          }
-                        }
-                        return newNodeSet;
-                      });
-
-          // Iterate on the replica types in the enum order. We place more strategic replicas first
-          // (NRT is more strategic than TLOG more strategic than PULL). This is in case we
-          // eventually decide that less strategic replica placement impossibility is not a problem
-          // that should lead to replica placement computation failure. Current code does fail if
-          // placement is impossible (constraint is at most one replica of a shard on any node).
-          for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-            makePlacementDecisions(
-                solrCollection,
-                shardName,
-                availabilityZones,
-                replicaType,
-                request.getCountReplicasToCreate(replicaType),
-                attrValues,
-                replicaTypeToNodes,
-                nodesWithReplicas,
-                allCoresOnNodes,
-                placementContext.getPlacementPlanFactory(),
-                replicaPlacements,
-                doSpreadAcrossDomains);
-          }
-        }
-        placementPlans.add(
-            placementContext
-                .getPlacementPlanFactory()
-                .createPlacementPlan(request, replicaPlacements));
-      }
-
-      return placementPlans;
-    }
-
-    private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, AttributeValues attrValues) {
-      boolean doSpreadAcrossDomains =
-          spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
-      if (spreadAcrossDomains && !doSpreadAcrossDomains) {
-        log.warn(
-            "AffinityPlacementPlugin configured to spread across domains, but there are nodes in the cluster without the {} system property. Ignoring spreadAcrossDomains.",
-            AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
-      }
-      return doSpreadAcrossDomains;
-    }
-
-    private boolean spreadDomainPropPresent(Set<Node> allNodes, AttributeValues attrValues) {
-      // We can only use spread domains if all nodes have the system property
-      return allNodes.stream()
-          .noneMatch(
-              n ->
-                  attrValues
-                      .getSystemProperty(n, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                      .isEmpty());
     }
 
     @Override
-    public void verifyAllowedModification(
-        ModificationRequest modificationRequest, PlacementContext placementContext)
-        throws PlacementModificationException, InterruptedException {
-      if (modificationRequest instanceof DeleteShardsRequest) {
-        log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
-      } else if (modificationRequest instanceof DeleteCollectionRequest) {
-        verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
-      } else if (modificationRequest instanceof DeleteReplicasRequest) {
-        verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
-      } else {
-        log.warn("unsupported request type, skipping: {}", modificationRequest);
-      }
-    }
-
-    private void verifyDeleteCollection(
+    protected void verifyDeleteCollection(
         DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext)
-        throws PlacementModificationException, InterruptedException {
+        throws PlacementModificationException {
       Cluster cluster = placementContext.getCluster();
-      Set<String> colocatedCollections =
-          colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of());
-      for (String primaryName : colocatedCollections) {
+      Set<String> collocatedCollections =
+          collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of());
+      for (String primaryName : collocatedCollections) {
         try {
           if (cluster.getCollection(primaryName) != null) {
             // still exists
             throw new PlacementModificationException(
-                "colocated collection "
+                "collocated collection "
                     + primaryName
                     + " of "
                     + deleteCollectionRequest.getCollection().getName()
@@ -421,788 +220,577 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
           }
         } catch (IOException e) {
           throw new PlacementModificationException(
-              "failed to retrieve colocated collection information", e);
+              "failed to retrieve collocated collection information", e);
         }
       }
     }
 
-    private void verifyDeleteReplicas(
-        DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext)
-        throws PlacementModificationException, InterruptedException {
-      Cluster cluster = placementContext.getCluster();
-      SolrCollection secondaryCollection = deleteReplicasRequest.getCollection();
-      Set<String> colocatedCollections = colocatedWith.get(secondaryCollection.getName());
-      if (colocatedCollections == null) {
-        return;
-      }
-      Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>();
-      secondaryCollection
-          .shards()
-          .forEach(
-              shard ->
-                  shard
-                      .replicas()
-                      .forEach(
-                          replica -> {
-                            secondaryNodeShardReplicas
-                                .computeIfAbsent(replica.getNode(), n -> new HashMap<>())
-                                .computeIfAbsent(
-                                    replica.getShard().getShardName(), s -> new AtomicInteger())
-                                .incrementAndGet();
-                          }));
-
-      // find the colocated-with collections
-      Map<Node, Set<String>> colocatingNodes = new HashMap<>();
-      try {
-        for (String colocatedCollection : colocatedCollections) {
-          SolrCollection coll = cluster.getCollection(colocatedCollection);
-          coll.shards()
-              .forEach(
-                  shard ->
-                      shard
-                          .replicas()
-                          .forEach(
-                              replica -> {
-                                colocatingNodes
-                                    .computeIfAbsent(replica.getNode(), n -> new HashSet<>())
-                                    .add(coll.getName());
-                              }));
-        }
-      } catch (IOException ioe) {
-        throw new PlacementModificationException(
-            "failed to retrieve colocated collection information", ioe);
-      }
-      PlacementModificationException exception = null;
-      for (Replica replica : deleteReplicasRequest.getReplicas()) {
-        if (!colocatingNodes.containsKey(replica.getNode())) {
-          continue;
-        }
-        // check that there will be at least one replica remaining
-        AtomicInteger secondaryCount =
-            secondaryNodeShardReplicas
-                .getOrDefault(replica.getNode(), Map.of())
-                .getOrDefault(replica.getShard().getShardName(), new AtomicInteger());
-        if (secondaryCount.get() > 1) {
-          // we can delete it - record the deletion
-          secondaryCount.decrementAndGet();
-          continue;
-        }
-        // fail - this replica cannot be removed
-        if (exception == null) {
-          exception = new PlacementModificationException("delete replica(s) rejected");
-        }
-        exception.addRejectedModification(
-            replica.toString(),
-            "co-located with replicas of " + colocatingNodes.get(replica.getNode()));
-      }
-      if (exception != null) {
-        throw exception;
-      }
-    }
-
-    private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
-      Set<String> azs = new HashSet<>();
-
-      for (Node n : nodes) {
-        azs.add(getNodeAZ(n, attrValues));
-      }
-
-      return Collections.unmodifiableSet(azs);
-    }
-
     /**
-     * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property
-     * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then return {@link
-     * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
+     * AffinityPlacementContext is used to share information across {@link AffinityNode} instances.
+     *
+     * <p>For instance, with SpreadDomains and AvailabilityZones, the weighting of a Node requires
+     * information on the contents of other Nodes. This class is how that information is shared.
+     *
+     * <p>One AffinityPlacementContext is used for each call to {@link
+     * #computePlacements(Collection, PlacementContext)} or {@link #computeBalancing(BalanceRequest,
+     * PlacementContext)}. The state of the context will be altered throughout the computation.
      */
-    private String getNodeAZ(Node n, final AttributeValues attrValues) {
-      Optional<String> nodeAz =
-          attrValues.getSystemProperty(n, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP);
-      // All nodes with undefined AZ will be considered part of the same AZ. This also works for
-      // deployments that do not care about AZ's
-      return nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+    private static final class AffinityPlacementContext {
+      private final Set<String> allSpreadDomains = new HashSet<>();
+      private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage = new HashMap<>();
+      private final Set<String> allAvailabilityZones = new HashSet<>();
+      private final Map<String, Map<String, Map<Replica.ReplicaType, ReplicaSpread>>>
+          availabilityZoneUsage = new HashMap<>();
+      private boolean doSpreadAcrossDomains;
     }
 
-    /**
-     * This class captures an availability zone and the nodes that are legitimate targets for
-     * replica placement in that Availability Zone. Instances are used as values in a {@link
-     * java.util.TreeMap} in which the total number of already existing replicas in the AZ is the
-     * key. This allows easily picking the set of nodes from which to select a node for placement in
-     * order to balance the number of replicas per AZ. Picking one of the nodes from the set is done
-     * using different criteria unrelated to the Availability Zone (picking the node is based on the
-     * {@link CoresAndDiskComparator} ordering).
-     */
-    private static class AzWithNodes {
-      final String azName;
-      private final boolean useSpreadDomains;
-      private boolean listIsSorted = false;
-      private final Comparator<Node> nodeComparator;
-      private final Random random;
-      private final List<Node> availableNodesForPlacement;
-      private final AttributeValues attributeValues;
-      private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
-      private final Map<String, Integer> currentSpreadDomainUsageUsage;
-      private int numNodesForPlacement;
-
-      AzWithNodes(
-          String azName,
-          List<Node> availableNodesForPlacement,
-          boolean useSpreadDomains,
-          Comparator<Node> nodeComparator,
-          Random random,
-          AttributeValues attributeValues,
-          Map<String, Integer> currentSpreadDomainUsageUsage) {
-        this.azName = azName;
-        this.availableNodesForPlacement = availableNodesForPlacement;
-        this.useSpreadDomains = useSpreadDomains;
-        this.nodeComparator = nodeComparator;
-        this.random = random;
-        this.attributeValues = attributeValues;
-        this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
-        this.numNodesForPlacement = availableNodesForPlacement.size();
+    @Override
+    protected Map<Node, WeightedNode> getBaseWeightedNodes(
+        PlacementContext placementContext,
+        Set<Node> nodes,
+        Iterable<SolrCollection> relevantCollections,
+        boolean skipNodesWithErrors)
+        throws PlacementException {
+      // Fetch attributes for a superset of all nodes requested amongst the placementRequests
+      AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
+      attributeFetcher
+          .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+          .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
+          .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+          .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+      attributeFetcher
+          .requestNodeMetric(NodeMetricImpl.NUM_CORES)
+          .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
+      Set<ReplicaMetric<?>> replicaMetrics = Set.of(ReplicaMetricImpl.INDEX_SIZE_GB);
+      Set<String> requestedCollections = new HashSet<>();
+      for (SolrCollection collection : relevantCollections) {
+        if (requestedCollections.add(collection.getName())) {
+          attributeFetcher.requestCollectionMetrics(collection, replicaMetrics);
+        }
       }
+      attributeFetcher.fetchFrom(nodes);
+      final AttributeValues attrValues = attributeFetcher.fetchAttributes();
 
-      private boolean hasBeenSorted() {
-        return (useSpreadDomains && sortedSpreadDomains != null)
-            || (!useSpreadDomains && listIsSorted);
-      }
+      AffinityPlacementContext affinityPlacementContext = new AffinityPlacementContext();
+      affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains;
 
-      void ensureSorted() {
-        if (!hasBeenSorted()) {
-          sort();
+      Map<Node, WeightedNode> affinityNodeMap = CollectionUtil.newHashMap(nodes.size());
+      for (Node node : nodes) {
+        AffinityNode affinityNode =
+            newNodeFromMetrics(node, attrValues, affinityPlacementContext, skipNodesWithErrors);
+        if (affinityNode != null) {
+          affinityNodeMap.put(node, affinityNode);
         }
       }
 
-      private void sort() {
-        assert !listIsSorted && sortedSpreadDomains == null
-            : "We shouldn't be sorting this list again";
+      // If there are not multiple spreadDomains, then there is nothing to spread across
+      if (affinityPlacementContext.allSpreadDomains.size() < 2) {
+        affinityPlacementContext.doSpreadAcrossDomains = false;
+      }
 
-        // Make sure we do not tend to use always the same nodes (within an AZ) if all
-        // conditions are identical (well, this likely is not the case since after having added
-        // a replica to a node its number of cores increases for the next placement decision,
-        // but let's be defensive here, given that multiple concurrent placement decisions might
-        // see the same initial cluster state, and we want placement to be reasonable even in
-        // that case without creating an unnecessary imbalance). For example, if all nodes have
-        // 0 cores and same amount of free disk space, ideally we want to pick a random node for
-        // placement, not always the same one due to some internal ordering.
-        Collections.shuffle(availableNodesForPlacement, random);
+      return affinityNodeMap;
+    }
 
-        if (useSpreadDomains) {
-          // When we use spread domains, we don't just sort the list of nodes, instead we generate a
-          // TreeSet of SpreadDomainWithNodes,
-          // sorted by the number of times the domain has been used. Each
-          // SpreadDomainWithNodes internally contains the list of nodes that belong to that
-          // particular domain,
-          // and it's sorted internally by the comparator passed to this
-          // class (which is the same that's used when not using spread domains).
-          // Whenever a node from a particular SpreadDomainWithNodes is selected as the best
-          // candidate, the call to "removeBestNode" will:
-          // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
-          // 2. Remove the best node from the list within the SpreadDomainWithNodes
-          // 3. Increment the count of times the domain has been used
-          // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if there are still nodes
-          // available
-          HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new HashMap<>();
-          for (Node node : availableNodesForPlacement) {
-            spreadDomainToListOfNodesMap
-                .computeIfAbsent(
-                    attributeValues
-                        .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                        .get(),
-                    k -> new ArrayList<>())
-                .add(node);
-          }
-          sortedSpreadDomains =
-              new TreeSet<>(new SpreadDomainComparator(currentSpreadDomainUsageUsage));
+    AffinityNode newNodeFromMetrics(
+        Node node,
+        AttributeValues attrValues,
+        AffinityPlacementContext affinityPlacementContext,
+        boolean skipNodesWithErrors)
+        throws PlacementException {
+      Set<Replica.ReplicaType> supportedReplicaTypes =
+          attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream()
+              .flatMap(s -> Arrays.stream(s.split(",")))
+              .map(String::trim)
+              .map(s -> s.toUpperCase(Locale.ROOT))
+              .map(
+                  s -> {
+                    try {
+                      return Replica.ReplicaType.valueOf(s);
+                    } catch (IllegalArgumentException e) {
+                      log.warn(
+                          "Node {} has an invalid value for the {} systemProperty: {}",
+                          node.getName(),
+                          AffinityPlacementConfig.REPLICA_TYPE_SYSPROP,
+                          s);
+                      return null;
+                    }
+                  })
+              .collect(Collectors.toSet());
+      if (supportedReplicaTypes.isEmpty()) {
+        // If property not defined or is only whitespace on a node, assuming node can take any
+        // replica type
+        supportedReplicaTypes = Set.of(Replica.ReplicaType.values());
+      }
 
-          int i = 0;
-          for (Map.Entry<String, List<Node>> entry : spreadDomainToListOfNodesMap.entrySet()) {
-            // Sort the nodes within the spread domain by the provided comparator
-            entry.getValue().sort(nodeComparator);
-            sortedSpreadDomains.add(
-                new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), i++, nodeComparator));
+      Set<String> nodeType;
+      Optional<String> nodePropOpt =
+          attrValues.getSystemProperty(node, AffinityPlacementConfig.NODE_TYPE_SYSPROP);
+      if (nodePropOpt.isEmpty()) {
+        nodeType = Collections.emptySet();
+      } else {
+        nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
+      }
+
+      Optional<Double> nodeFreeDiskGB = attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB);
+      Optional<Integer> nodeNumCores = attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES);
+      String az =
+          attrValues
+              .getSystemProperty(node, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+              .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+      affinityPlacementContext.allAvailabilityZones.add(az);
+      String spreadDomain;
+      if (affinityPlacementContext.doSpreadAcrossDomains) {
+        spreadDomain =
+            attrValues
+                .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+                .orElse(null);
+        if (spreadDomain == null) {
+          if (log.isWarnEnabled()) {
+            log.warn(
+                "AffinityPlacementPlugin configured to spread across domains, but node {} does not have the {} system property. Ignoring spreadAcrossDomains.",
+                node.getName(),
+                AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
           }
+          // In the context stop using spreadDomains, because we have a node without a spread
+          // domain.
+          affinityPlacementContext.doSpreadAcrossDomains = false;
+          affinityPlacementContext.allSpreadDomains.clear();
         } else {
-          availableNodesForPlacement.sort(nodeComparator);
-          listIsSorted = true;
+          affinityPlacementContext.allSpreadDomains.add(spreadDomain);
         }
+      } else {
+        spreadDomain = null;
       }
-
-      Node getBestNode() {
-        assert hasBeenSorted();
-        if (useSpreadDomains) {
-          return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
-        } else {
-          return availableNodesForPlacement.get(0);
+      if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) {
+        if (log.isWarnEnabled()) {
+          log.warn(
+              "Unknown free disk on node {}, excluding it from placement decisions.",
+              node.getName());
         }
-      }
-
-      public Node removeBestNode() {
-        assert hasBeenSorted();
-        this.numNodesForPlacement--;
-        if (useSpreadDomains) {
-          // Since this SpreadDomainWithNodes needs to be re-sorted in the sortedSpreadDomains, we
-          // remove it and then re-add it, once the best node has been removed.
-          SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
-          Node n = group.sortedNodesForPlacement.remove(0);
-          this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, Integer::sum);
-          if (!group.sortedNodesForPlacement.isEmpty()) {
-            sortedSpreadDomains.add(group);
-          }
-          return n;
-        } else {
-          return availableNodesForPlacement.remove(0);
+        return null;
+      } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) {
+        if (log.isWarnEnabled()) {
+          log.warn(
+              "Unknown number of cores on node {}, excluding it from placement decisions.",
+              node.getName());
         }
-      }
-
-      public int numNodes() {
-        return this.numNodesForPlacement;
+        return null;
+      } else {
+        return new AffinityNode(
+            node,
+            attrValues,
+            affinityPlacementContext,
+            supportedReplicaTypes,
+            nodeType,
+            nodeNumCores.orElse(0),
+            nodeFreeDiskGB.orElse(0D),
+            az,
+            spreadDomain);
       }
     }
 
     /**
-     * This class represents group of nodes with the same {@link
-     * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+     * This implementation weights nodes in order to achieve balancing across AvailabilityZones and
+     * SpreadDomains, while trying to minimize the amount of replicas on a node and ensure a given
+     * disk space per node. This implementation also supports limiting the placement of certain
+     * replica types per node and co-locating collections.
+     *
+     * <p>The total weight of the AffinityNode is the sum of:
+     *
+     * <ul>
+     *   <li>The number of replicas on the node
+     *   <li>100 if the free disk space on the node < prioritizedFreeDiskGB, otherwise 0
+     *   <li>If SpreadDomains are used:<br>
+     *       10,000 * the sum over each collection/shard:
+     *       <ul>
+     *         <li>(# of replicas in this node's spread domain - the minimum spreadDomain's
+     *             replicaCount)^2 <br>
+     *             <i>These are individually squared to penalize higher values when summing up all
+     *             values</i>
+     *       </ul>
+     *   <li>If AvailabilityZones are used:<br>
+     *       1,000,000 * the sum over each collection/shard/replicaType:
+     *       <ul>
+     *         <li>(# of replicas in this node's AZ - the minimum AZ's replicaCount)^2 <br>
+     *             <i>These are individually squared to penalize higher values when summing up all
+     *             values</i>
+     *       </ul>
+     * </ul>
+     *
+     * The weighting here ensures that the order of importance for nodes is:
+     *
+     * <ol>
+     *   <li>Spread replicas of the same shard/replicaType across availabilityZones
+     *   <li>Spread replicas of the same shard across spreadDomains
+     *   <li>Make sure that replicas are not placed on nodes that have < prioritizedFreeDiskGB disk
+     *       space available
+     *   <li>Minimize the amount of replicas on the node
+     * </ol>
+     *
+     * <p>The "relevant" weight with a replica is the sum of:
+     *
+     * <ul>
+     *   <li>The number of replicas on the node
+     *   <li>100 if the projected free disk space on the node < prioritizedFreeDiskGB, otherwise 0
+     *   <li>If SpreadDomains are used:<br>
+     *       10,000 * ( # of replicas for the replica's shard in this node's spread domain - the
+     *       minimum spreadDomain's replicaCount )
+     *   <li>If AvailabilityZones are used:<br>
+     *       1,000,000 * ( # of replicas for the replica's shard & replicaType in this node's AZ -
+     *       the minimum AZ's replicaCount )
+     * </ul>
+     *
+     * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+     *
+     * <p>Users can specify withCollection, to ensure that co-placement of replicas is ensured when
+     * computing new replica placements or replica balancing.
      */
-    static class SpreadDomainWithNodes implements Comparable<SpreadDomainWithNodes> {
+    private class AffinityNode extends WeightedNode {
 
-      /**
-       * This is the label that all nodes in this group have in {@link
-       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
-       */
-      final String spreadDomainName;
+      private final AttributeValues attrValues;
 
-      /**
-       * The list of all nodes that contain the same {@link
-       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be sorted before creating
-       * this class.
-       */
-      private final List<Node> sortedNodesForPlacement;
+      private final AffinityPlacementContext affinityPlacementContext;
 
-      /**
-       * This is used for tie breaking the sort of {@link SpreadDomainWithNodes}, when the
-       * nodeComparator between the top nodes of each group return 0.
-       */
-      private final int tieBreaker;
+      private final Set<Replica.ReplicaType> supportedReplicaTypes;
+      private final Set<String> nodeType;
 
-      /**
-       * This is the comparator that is used to compare the top nodes in the {@link
-       * #sortedNodesForPlacement} lists. Must be the same that was used to sort {@link
-       * #sortedNodesForPlacement}.
-       */
-      private final Comparator<Node> nodeComparator;
+      private int coresOnNode;
+      private double nodeFreeDiskGB;
 
-      public SpreadDomainWithNodes(
-          String spreadDomainName,
-          List<Node> sortedNodesForPlacement,
-          int tieBreaker,
-          Comparator<Node> nodeComparator) {
-        this.spreadDomainName = spreadDomainName;
-        this.sortedNodesForPlacement = sortedNodesForPlacement;
-        this.tieBreaker = tieBreaker;
-        this.nodeComparator = nodeComparator;
+      private final String availabilityZone;
+      private final String spreadDomain;
+
+      AffinityNode(
+          Node node,
+          AttributeValues attrValues,
+          AffinityPlacementContext affinityPlacementContext,
+          Set<Replica.ReplicaType> supportedReplicaTypes,
+          Set<String> nodeType,
+          int coresOnNode,
+          double nodeFreeDiskGB,
+          String az,
+          String spreadDomain) {
+        super(node);
+        this.attrValues = attrValues;
+        this.affinityPlacementContext = affinityPlacementContext;
+        this.supportedReplicaTypes = supportedReplicaTypes;
+        this.nodeType = nodeType;
+        this.coresOnNode = coresOnNode;
+        this.nodeFreeDiskGB = nodeFreeDiskGB;
+        this.availabilityZone = az;
+        this.spreadDomain = spreadDomain;
       }
 
       @Override
-      public int compareTo(SpreadDomainWithNodes o) {
-        if (o == this) {
-          return 0;
-        }
-        int result =
-            nodeComparator.compare(
-                this.sortedNodesForPlacement.get(0), o.sortedNodesForPlacement.get(0));
-        if (result == 0) {
-          return Integer.compare(this.tieBreaker, o.tieBreaker);
-        }
-        return result;
+      public int calcWeight() {
+        return coresOnNode
+            // Only add 100 if prioritizedFreeDiskGB was provided and the node's freeDisk is lower
+            // than it
+            + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB < prioritizedFreeDiskGB ? 1 : 0)
+            + 10000 * getSpreadDomainWeight()
+            + 1000000 * getAZWeight();
       }
-    }
 
-    /**
-     * Builds the number of existing cores on each node returned in the attrValues. Nodes for which
-     * the number of cores is not available for whatever reason are excluded from acceptable
-     * candidate nodes as it would not be possible to make any meaningful placement decisions.
-     *
-     * @param nodes all nodes on which this plugin should compute placement
-     * @param attrValues attributes fetched for the nodes. This method uses system property {@link
-     *     AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each
-     *     node.
-     */
-    private Map<Node, Integer> getCoreCountPerNode(
-        Set<Node> nodes, final AttributeValues attrValues) {
-      Map<Node, Integer> coresOnNodes = new HashMap<>();
-
-      for (Node node : nodes) {
-        attrValues
-            .getNodeMetric(node, NodeMetricImpl.NUM_CORES)
-            .ifPresent(count -> coresOnNodes.put(node, count));
+      @Override
+      public int calcRelevantWeightWithReplica(Replica replica) {
+        return coresOnNode
+            // Only add 100 if prioritizedFreeDiskGB was provided and the node's projected freeDisk
+            // is lower than it
+            + 100
+                * (prioritizedFreeDiskGB > 0
+                        && nodeFreeDiskGB - getProjectedSizeOfReplica(replica)
+                            < prioritizedFreeDiskGB
+                    ? 1
+                    : 0)
+            + 10000 * projectReplicaSpreadWeight(replica)
+            + 1000000 * projectAZWeight(replica);
       }
 
-      return coresOnNodes;
-    }
-
-    /**
-     * Given the set of all nodes on which to do placement and fetched attributes, builds the sets
-     * representing candidate nodes for placement of replicas of each replica type. These sets are
-     * packaged and returned in an EnumMap keyed by replica type. Nodes for which the number of
-     * cores is not available for whatever reason are excluded from acceptable candidate nodes as it
-     * would not be possible to make any meaningful placement decisions.
-     *
-     * @param nodes all nodes on which this plugin should compute placement
-     * @param attrValues attributes fetched for the nodes. This method uses system property {@link
-     *     AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each
-     *     node.
-     */
-    private EnumMap<Replica.ReplicaType, Set<Node>> getAvailableNodesForReplicaTypes(
-        Set<Node> nodes, final AttributeValues attrValues) {
-      EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
-          new EnumMap<>(Replica.ReplicaType.class);
-
-      for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-        replicaTypeToNodes.put(replicaType, new HashSet<>());
+      @Override
+      public boolean canAddReplica(Replica replica) {
+        String collection = replica.getShard().getCollection().getName();
+        // By default, do not allow two replicas of the same shard on a node
+        return super.canAddReplica(replica)
+            // Filter out unsupported replica types
+            && supportedReplicaTypes.contains(replica.getType())
+            // Filter out unsupported node types
+            && Optional.ofNullable(nodeTypes.get(collection))
+                .map(s -> s.stream().anyMatch(nodeType::contains))
+                .orElse(true)
+            // Ensure any co-located collections already exist on the Node
+            && Optional.ofNullable(withCollections.get(collection))
+                .map(this::hasCollectionOnNode)
+                .orElse(true)
+            // Ensure the disk space will not go below the minimum if the replica is added
+            && (minimalFreeDiskGB <= 0
+                || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB);
       }
 
-      for (Node node : nodes) {
-        // Exclude nodes with unknown or too small disk free space
-        if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).isEmpty()) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Unknown free disk on node {}, excluding it from placement decisions.",
-                node.getName());
-          }
-          // We rely later on the fact that the free disk optional is present (see
-          // CoresAndDiskComparator), be careful it you change anything here.
-          continue;
-        }
-        if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get() < minimalFreeDiskGB) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.",
-                node.getName(),
-                attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get(),
-                minimalFreeDiskGB);
-          }
-          continue;
-        }
-
-        if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Unknown number of cores on node {}, excluding it from placement decisions.",
-                node.getName());
+      /**
+       * Return any replicas that cannot be removed because there are collocated collections that
+       * require the replica to exist.
+       *
+       * @param replicas the replicas to remove
+       * @return any errors for replicas that cannot be removed
+       */
+      @Override
+      public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
+        Map<Replica, String> replicaRemovalExceptions = new HashMap<>();
+        Map<String, Map<String, Set<Replica>>> removals = new HashMap<>();
+        for (Replica replica : replicas) {
+          SolrCollection collection = replica.getShard().getCollection();
+          Set<String> collocatedCollections = new HashSet<>();
+          Optional.ofNullable(collocatedWith.get(collection.getName()))
+              .ifPresent(collocatedCollections::addAll);
+          collocatedCollections.retainAll(getCollectionsOnNode());
+          if (collocatedCollections.isEmpty()) {
+            continue;
           }
-          // We rely later on the fact that the number of cores optional is present (see
-          // CoresAndDiskComparator), be careful it you change anything here.
-          continue;
-        }
 
-        String supportedReplicaTypes =
-            attrValues
-                    .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-                    .isPresent()
-                ? attrValues
-                    .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-                    .get()
-                : null;
-        // If property not defined or is only whitespace on a node, assuming node can take any
-        // replica type
-        if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
-          for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-            replicaTypeToNodes.get(rt).add(node);
-          }
-        } else {
-          Set<String> acceptedTypes =
-              Arrays.stream(supportedReplicaTypes.split(","))
-                  .map(String::trim)
-                  .map(s -> s.toLowerCase(Locale.ROOT))
-                  .collect(Collectors.toSet());
-          for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-            if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
-              replicaTypeToNodes.get(rt).add(node);
-            }
+          // There are collocatedCollections for this shard, so make sure there is a replica of this
+          // shard left on the node after it is removed
+          Set<Replica> replicasRemovedForShard =
+              removals
+                  .computeIfAbsent(
+                      replica.getShard().getCollection().getName(), k -> new HashMap<>())
+                  .computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>());
+          replicasRemovedForShard.add(replica);
+
+          if (replicasRemovedForShard.size()
+              >= getReplicasForShardOnNode(replica.getShard()).size()) {
+            replicaRemovalExceptions.put(
+                replica, "co-located with replicas of " + collocatedCollections);
           }
         }
+        return replicaRemovalExceptions;
       }
-      return replicaTypeToNodes;
-    }
 
-    /**
-     * Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas.
-     *
-     * <p>The criteria used in this method are, in this order:
-     *
-     * <ol>
-     *   <li>No more than one replica of a given shard on a given node (strictly enforced)
-     *   <li>Balance as much as possible replicas of a given {@link
-     *       org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. This balancing takes
-     *       into account existing replicas <b>of the corresponding replica type</b>, if any.
-     *   <li>Place replicas if possible on nodes having more than a certain amount of free disk
-     *       space (note that nodes with a too small amount of free disk space were eliminated as
-     *       placement targets earlier, in {@link #getAvailableNodesForReplicaTypes(Set,
-     *       AttributeValues)}). There's a threshold here rather than sorting on the amount of free
-     *       disk space, because sorting on that value would in practice lead to never considering
-     *       the number of cores on a node.
-     *   <li>Place replicas on nodes having a smaller number of cores (the number of cores
-     *       considered for this decision includes previous placement decisions made during the
-     *       processing of the placement request)
-     * </ol>
-     */
-    @SuppressForbidden(
-        reason =
-            "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
-    private void makePlacementDecisions(
-        SolrCollection solrCollection,
-        String shardName,
-        Set<String> availabilityZones,
-        Replica.ReplicaType replicaType,
-        int numReplicas,
-        final AttributeValues attrValues,
-        EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes,
-        Set<Node> nodesWithReplicas,
-        Map<Node, Integer> coresOnNodes,
-        PlacementPlanFactory placementPlanFactory,
-        Set<ReplicaPlacement> replicaPlacements,
-        boolean doSpreadAcrossDomains)
-        throws PlacementException {
-      // Count existing replicas per AZ. We count only instances of the type of replica for which we
-      // need to do placement. If we ever want to balance replicas of any type across AZ's (and not
-      // each replica type balanced independently), we'd have to move this data structure to the
-      // caller of this method so it can be reused across different replica type placements for a
-      // given shard. Note then that this change would be risky. For example all NRT's and PULL
-      // replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up
-      // in the same AZ...
-      Map<String, Integer> azToNumReplicas = new HashMap<>();
-      for (String az : availabilityZones) {
-        azToNumReplicas.put(az, 0);
+      @Override
+      protected boolean addProjectedReplicaWeights(Replica replica) {
+        nodeFreeDiskGB -= getProjectedSizeOfReplica(replica);
+        coresOnNode += 1;
+        return addReplicaToAzAndSpread(replica);
       }
 
-      // Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica
-      // type
-      Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType));
-      // Remove nodes that already have a replica for the shard (no two replicas of same shard can
-      // be put on same node)
-      candidateNodes.removeAll(nodesWithReplicas);
+      @Override
+      protected void initReplicaWeights(Replica replica) {
+        addReplicaToAzAndSpread(replica);
+      }
 
-      // This Map will include the affinity labels for the nodes that are currently hosting replicas
-      // of this shard. It will be modified with new placement decisions.
-      Map<String, Integer> spreadDomainsInUse = new HashMap<>();
-      Shard shard = solrCollection.getShard(shardName);
-      if (shard != null) {
-        // shard is non null if we're adding replicas to an already existing collection.
-        // If we're creating the collection, the shards do not exist yet.
-        for (Replica replica : shard.replicas()) {
-          // The node's AZ is counted as having a replica if it has a replica of the same type as
-          // the one we need to place here.
-          if (replica.getType() == replicaType) {
-            final String az = getNodeAZ(replica.getNode(), attrValues);
-            if (azToNumReplicas.containsKey(az)) {
-              // We do not count replicas on AZ's for which we don't have any node to place on
-              // because it would not help the placement decision. If we did want to do that, note
-              // the dereferencing below can't be assumed as the entry will not exist in the map.
-              azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
-            }
-            if (doSpreadAcrossDomains) {
-              attrValues
-                  .getSystemProperty(
-                      replica.getNode(), AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                  .ifPresent(nodeDomain -> spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum));
-            }
-          }
+      private boolean addReplicaToAzAndSpread(Replica replica) {
+        boolean needsResort = false;
+        // Only use AvailabilityZones if there are more than 1
+        if (affinityPlacementContext.allAvailabilityZones.size() > 1) {
+          needsResort |=
+              affinityPlacementContext
+                  .availabilityZoneUsage
+                  .computeIfAbsent(
+                      replica.getShard().getCollection().getName(), k -> new HashMap<>())
+                  .computeIfAbsent(replica.getShard().getShardName(), k -> new HashMap<>())
+                  .computeIfAbsent(
+                      replica.getType(),
+                      k -> new ReplicaSpread(affinityPlacementContext.allAvailabilityZones))
+                  .addReplica(availabilityZone);
         }
+        // Only use SpreadDomains if they have been provided to all nodes and there are more than 1
+        if (affinityPlacementContext.doSpreadAcrossDomains) {
+          needsResort |=
+              affinityPlacementContext
+                  .spreadDomainUsage
+                  .computeIfAbsent(
+                      replica.getShard().getCollection().getName(), k -> new HashMap<>())
+                  .computeIfAbsent(
+                      replica.getShard().getShardName(),
+                      k -> new ReplicaSpread(affinityPlacementContext.allSpreadDomains))
+                  .addReplica(spreadDomain);
+        }
+        return needsResort;
       }
 
-      // We now have the set of real candidate nodes, we've enforced "No more than one replica of a
-      // given shard on a given node". We also counted for the shard and replica type under
-      // consideration how many replicas were per AZ, so we can place (or try to place) replicas on
-      // AZ's that have fewer replicas
-
-      Map<String, List<Node>> nodesPerAz = new HashMap<>();
-      if (availabilityZones.size() == 1) {
-        // If AZs are not being used (all undefined for example) or a single AZ exists, we add all
-        // nodes
-        // to the same entry
-        nodesPerAz.put(availabilityZones.iterator().next(), new ArrayList<>(candidateNodes));
-      } else {
-        // Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to
-        // placement candidates.
-        for (Node node : candidateNodes) {
-          String nodeAz = getNodeAZ(node, attrValues);
-          List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>());
-          nodesForAz.add(node);
+      @Override
+      protected void removeProjectedReplicaWeights(Replica replica) {
+        nodeFreeDiskGB += getProjectedSizeOfReplica(replica);
+        coresOnNode -= 1;
+        // Only use AvailabilityZones if there are more than 1
+        if (affinityPlacementContext.allAvailabilityZones.size() > 1) {
+          Optional.ofNullable(
+                  affinityPlacementContext.availabilityZoneUsage.get(
+                      replica.getShard().getCollection().getName()))
+              .map(m -> m.get(replica.getShard().getShardName()))
+              .map(m -> m.get(replica.getType()))
+              .ifPresent(m -> m.removeReplica(availabilityZone));
+        }
+        // Only use SpreadDomains if they have been provided to all nodes and there are more than 1
+        if (affinityPlacementContext.doSpreadAcrossDomains) {
+          Optional.ofNullable(
+                  affinityPlacementContext.spreadDomainUsage.get(
+                      replica.getShard().getCollection().getName()))
+              .map(m -> m.get(replica.getShard().getShardName()))
+              .ifPresent(m -> m.removeReplica(spreadDomain));
         }
       }
 
-      Comparator<Node> interGroupNodeComparator =
-          new CoresAndDiskComparator(attrValues, coresOnNodes, prioritizedFreeDiskGB);
-
-      // Build a treeMap sorted by the number of replicas per AZ and including candidates nodes
-      // suitable for placement on the AZ, so we can easily select the next AZ to get a replica
-      // assignment and quickly (constant time) decide if placement on this AZ is possible or not.
-      Map<Integer, Set<AzWithNodes>> azByExistingReplicas =
-          new TreeMap<>(Comparator.naturalOrder());
-      for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
-        azByExistingReplicas
-            .computeIfAbsent(azToNumReplicas.get(e.getKey()), k -> new HashSet<>())
-            .add(
-                new AzWithNodes(
-                    e.getKey(),
-                    e.getValue(),
-                    doSpreadAcrossDomains,
-                    interGroupNodeComparator,
-                    replicaPlacementRandom,
-                    attrValues,
-                    spreadDomainsInUse));
+      private double getProjectedSizeOfReplica(Replica replica) {
+        return attrValues
+            .getCollectionMetrics(replica.getShard().getCollection().getName())
+            .flatMap(colMetrics -> colMetrics.getShardMetrics(replica.getShard().getShardName()))
+            .flatMap(ShardMetrics::getLeaderMetrics)
+            .flatMap(lrm -> lrm.getReplicaMetric(ReplicaMetricImpl.INDEX_SIZE_GB))
+            .orElse(0D);
       }
 
-      for (int i = 0; i < numReplicas; i++) {
-        // We have for each AZ on which we might have a chance of placing a replica, the list of
-        // candidate nodes for replicas (candidate: does not already have a replica of this shard
-        // and is in the corresponding AZ). Among the AZ's with the minimal number of replicas of
-        // the given replica type for the shard, we must pick the AZ that offers the best placement
-        // (based on number of cores and free disk space). In order to do so, for these "minimal"
-        // AZ's we sort the nodes from best to worst placement candidate (based on the number of
-        // cores and free disk space) then pick the AZ that has the best best node. We don't sort
-        // all AZ's because that will not necessarily be needed.
-        int minNumberOfReplicasPerAz = 0; // This value never observed but compiler can't tell
-        Set<Map.Entry<Integer, AzWithNodes>> candidateAzEntries = null;
-        // Iterate over AZ's (in the order of increasing number of replicas on that AZ) and do two
-        // things: 1. remove those AZ's that have no nodes, no use iterating over these again and
-        // again (as we compute placement for more replicas), and 2. collect all those AZ with a
-        // minimal number of replicas.
-        for (Map.Entry<Integer, Set<AzWithNodes>> mapEntry : azByExistingReplicas.entrySet()) {
-          Iterator<AzWithNodes> it = mapEntry.getValue().iterator();
-          while (it.hasNext()) {
-            Map.Entry<Integer, AzWithNodes> entry = Map.entry(mapEntry.getKey(), it.next());
-            int numberOfNodes = entry.getValue().numNodes();
-            if (numberOfNodes == 0) {
-              it.remove();
-            } else { // AZ does have node(s) for placement
-              if (candidateAzEntries == null) {
-                // First AZ with nodes that can take the replica. Initialize tracking structures
-                minNumberOfReplicasPerAz = numberOfNodes;
-                candidateAzEntries = new HashSet<>();
-              }
-              if (minNumberOfReplicasPerAz != numberOfNodes) {
-                // AZ's with more replicas than the minimum number seen are not placement candidates
-                break;
-              }
-              candidateAzEntries.add(entry);
-              // We remove all entries that are candidates: the "winner" will be modified, all
-              // entries
-              // might also be sorted, so we'll insert back the updated versions later.
-              it.remove();
-            }
-          }
+      /**
+       * If there are more than one spreadDomains given in the cluster, then return a weight for
+       * this node, given the number of replicas in its spreadDomain.
+       *
+       * <p>For each Collection & Shard, sum up the number of replicas this node's SpreadDomain has
+       * over the minimum SpreadDomain. Square each value before summing, to ensure that smaller
+       * number of higher values are penalized more than a larger number of smaller values.
+       *
+       * @return the weight
+       */
+      private int getSpreadDomainWeight() {
+        if (affinityPlacementContext.doSpreadAcrossDomains) {
+          return affinityPlacementContext.spreadDomainUsage.values().stream()
+              .flatMap(m -> m.values().stream())
+              .mapToInt(rs -> rs.overMinimum(spreadDomain))
+              .map(i -> i * i)
+              .sum();
+        } else {
+          return 0;
         }
+      }
 
-        if (candidateAzEntries == null) {
-          // This can happen because not enough nodes for the placement request or already too many
-          // nodes with replicas of the shard that can't accept new replicas or not enough nodes
-          // with enough free disk space.
-          throw new PlacementException(
-              "Not enough eligible nodes to place "
-                  + numReplicas
-                  + " replica(s) of type "
-                  + replicaType
-                  + " for shard "
-                  + shardName
-                  + " of collection "
-                  + solrCollection.getName());
+      /**
+       * If there are more than one SpreadDomains given in the cluster, then return a projected
+       * SpreadDomain weight for this node and this replica.
+       *
+       * <p>For the new replica's Collection & Shard, project the number of replicas this node's
+       * SpreadDomain has over the minimum SpreadDomain.
+       *
+       * @return the weight
+       */
+      private int projectReplicaSpreadWeight(Replica replica) {
+        if (replica != null && affinityPlacementContext.doSpreadAcrossDomains) {
+          return Optional.ofNullable(
+                  affinityPlacementContext.spreadDomainUsage.get(
+                      replica.getShard().getCollection().getName()))
+              .map(m -> m.get(replica.getShard().getShardName()))
+              .map(rs -> rs.projectOverMinimum(spreadDomain, 1))
+              .orElse(0);
+        } else {
+          return 0;
         }
+      }
 
-        // Iterate over all candidate AZ's, sort them if needed and find the best one to use for
-        // this placement
-        Map.Entry<Integer, AzWithNodes> selectedAz = null;
-        Node selectedAzBestNode = null;
-        for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : candidateAzEntries) {
-          AzWithNodes azWithNodes = candidateAzEntry.getValue();
-          azWithNodes.ensureSorted();
-
-          // Which one is better, the new one or the previous best?
-          if (selectedAz == null
-              || interGroupNodeComparator.compare(azWithNodes.getBestNode(), selectedAzBestNode)
-                  < 0) {
-            selectedAz = candidateAzEntry;
-            selectedAzBestNode = azWithNodes.getBestNode();
-          }
+      /**
+       * If there are more than one AvailabilityZones given in the cluster, then return a weight for
+       * this node, given the number of replicas in its availabilityZone.
+       *
+       * <p>For each Collection, Shard & ReplicaType, sum up the number of replicas this node's
+       * AvailabilityZone has over the minimum AvailabilityZone. Square each value before summing,
+       * to ensure that smaller number of higher values are penalized more than a larger number of
+       * smaller values.
+       *
+       * @return the weight
+       */
+      private int getAZWeight() {
+        if (affinityPlacementContext.allAvailabilityZones.size() < 2) {
+          return 0;
+        } else {
+          return affinityPlacementContext.availabilityZoneUsage.values().stream()
+              .flatMap(m -> m.values().stream())
+              .flatMap(m -> m.values().stream())
+              .mapToInt(rs -> rs.overMinimum(availabilityZone))
+              .map(i -> i * i)
+              .sum();
         }
+      }
 
-        // Now actually remove the selected node from the winning AZ
-        AzWithNodes azWithNodes = selectedAz.getValue();
-        Node assignTarget = azWithNodes.removeBestNode();
-
-        // Insert back all the qualifying but non winning AZ's removed while searching for the one
-        for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
-          if (removedAzs != selectedAz) {
-            azByExistingReplicas
-                .computeIfAbsent(removedAzs.getKey(), k -> new HashSet<>())
-                .add(removedAzs.getValue());
-          }
+      /**
+       * If there are more than one AvailabilityZones given in the cluster, then return a projected
+       * AvailabilityZone weight for this node and this replica.
+       *
+       * <p>For the new replica's Collection, Shard & ReplicaType, project the number of replicas
+       * this node's AvailabilityZone has over the minimum AvailabilityZone.
+       *
+       * @return the weight
+       */
+      private int projectAZWeight(Replica replica) {
+        if (replica == null || affinityPlacementContext.allAvailabilityZones.size() < 2) {
+          return 0;
+        } else {
+          return Optional.ofNullable(
+                  affinityPlacementContext.availabilityZoneUsage.get(
+                      replica.getShard().getCollection().getName()))
+              .map(m -> m.get(replica.getShard().getShardName()))
+              .map(m -> m.get(replica.getType()))
+              .map(rs -> rs.projectOverMinimum(availabilityZone, 1))
+              .orElse(0);
         }
-
-        // Insert back a corrected entry for the winning AZ: one more replica living there and one
-        // less node that can accept new replicas (the remaining candidate node list might be empty,
-        // in which case it will be cleaned up on the next iteration).
-        azByExistingReplicas
-            .computeIfAbsent(selectedAz.getKey() + 1, k -> new HashSet<>())
-            .add(azWithNodes);
-
-        // Do not assign that node again for replicas of other replica type for this shard (this
-        // update of the set is not useful in the current execution of this method but for following
-        // ones only)
-        nodesWithReplicas.add(assignTarget);
-
-        // Track that the node has one more core. These values are only used during the current run
-        // of the plugin.
-        coresOnNodes.merge(assignTarget, 1, Integer::sum);
-
-        // Register the replica assignment just decided
-        replicaPlacements.add(
-            placementPlanFactory.createReplicaPlacement(
-                solrCollection, shardName, assignTarget, replicaType));
       }
     }
 
-    private Set<Node> filterNodesWithCollection(
-        Cluster cluster,
-        PlacementRequest request,
-        AttributeValues attributeValues,
-        Set<Node> initialNodes)
-        throws PlacementException {
-      // if there's a `withCollection` constraint for this collection then remove nodes
-      // that are not eligible
-      String withCollectionName = withCollections.get(request.getCollection().getName());
-      if (withCollectionName == null) {
-        return initialNodes;
-      }
-      SolrCollection withCollection;
-      try {
-        withCollection = cluster.getCollection(withCollectionName);
-      } catch (Exception e) {
-        throw new PlacementException(
-            "Error getting info of withCollection=" + withCollectionName, e);
-      }
-      Set<Node> withCollectionNodes = new HashSet<>();
-      withCollection
-          .shards()
-          .forEach(s -> s.replicas().forEach(r -> withCollectionNodes.add(r.getNode())));
-      if (withCollectionNodes.isEmpty()) {
-        throw new PlacementException(
-            "Collection "
-                + withCollection
-                + " defined in `withCollection` has no replicas on eligible nodes.");
-      }
-      HashSet<Node> filteredNodes = new HashSet<>(initialNodes);
-      filteredNodes.retainAll(withCollectionNodes);
-      if (filteredNodes.isEmpty()) {
-        throw new PlacementException(
-            "Collection "
-                + withCollection
-                + " defined in `withCollection` has no replicas on eligible nodes.");
-      }
-      return filteredNodes;
-    }
+    private static class ReplicaSpread {
+      private final Set<String> allKeys;
+      private final Map<String, Integer> spread;
+      private int minReplicasLocated;
 
-    private Set<Node> filterNodesByNodeType(
-        Cluster cluster,
-        PlacementRequest request,
-        AttributeValues attributeValues,
-        Set<Node> initialNodes)
-        throws PlacementException {
-      Set<String> collNodeTypes = nodeTypes.get(request.getCollection().getName());
-      if (collNodeTypes == null) {
-        // no filtering by node type
-        return initialNodes;
+      private ReplicaSpread(Set<String> allKeys) {
+        this.allKeys = allKeys;
+        this.spread = new HashMap<>();
+        this.minReplicasLocated = 0;
       }
-      Set<Node> filteredNodes =
-          initialNodes.stream()
-              .filter(
-                  n -> {
-                    Optional<String> nodePropOpt =
-                        attributeValues.getSystemProperty(
-                            n, AffinityPlacementConfig.NODE_TYPE_SYSPROP);
-                    if (!nodePropOpt.isPresent()) {
-                      return false;
-                    }
-                    Set<String> nodeTypes =
-                        new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
-                    nodeTypes.retainAll(collNodeTypes);
-                    return !nodeTypes.isEmpty();
-                  })
-              .collect(Collectors.toSet());
-      if (filteredNodes.isEmpty()) {
-        throw new PlacementException(
-            "There are no nodes with types: "
-                + collNodeTypes
-                + " expected by collection "
-                + request.getCollection().getName());
+
+      int overMinimum(String key) {
+        return spread.getOrDefault(key, 0) - minReplicasLocated;
       }
-      return filteredNodes;
-    }
-    /**
-     * Comparator implementing the placement strategy based on free space and number of cores: we
-     * want to place new replicas on nodes with the less number of cores, but only if they do have
-     * enough disk space (expressed as a threshold value).
-     */
-    static class CoresAndDiskComparator implements Comparator<Node> {
-      private final AttributeValues attrValues;
-      private final Map<Node, Integer> coresOnNodes;
-      private final long prioritizedFreeDiskGB;
 
       /**
-       * The data we sort on is not part of the {@link Node} instances but has to be retrieved from
-       * the attributes and configuration. The number of cores per node is passed in a map whereas
-       * the free disk is fetched from the attributes due to the fact that we update the number of
-       * cores per node as we do allocations, but we do not update the free disk. The attrValues
-       * corresponding to the number of cores per node are the initial values, but we want to
-       * compare the actual value taking into account placement decisions already made during the
-       * current execution of the placement plugin.
+       * Trying adding a replica for the given spread key, and return the {@link
+       * #overMinimum(String)} with it added. Remove the replica, so that the state is unchanged
+       * from when the method was called.
        */
-      CoresAndDiskComparator(
-          AttributeValues attrValues, Map<Node, Integer> coresOnNodes, long prioritizedFreeDiskGB) {
-        this.attrValues = attrValues;
-        this.coresOnNodes = coresOnNodes;
-        this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
-      }
-
-      @Override
-      public int compare(Node a, Node b) {
-        // Note all nodes do have free disk defined. This has been verified earlier.
-        boolean aHasLowFreeSpace =
-            attrValues.getNodeMetric(a, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
-        boolean bHasLowFreeSpace =
-            attrValues.getNodeMetric(b, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
-        if (aHasLowFreeSpace != bHasLowFreeSpace) {
-          // A node with low free space should be considered > node with high free space since it
-          // needs to come later in sort order
-          return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace);
+      int projectOverMinimum(String key, int replicaDelta) {
+        int overMinimum = overMinimum(key);
+        if (overMinimum == 0 && replicaDelta > 0) {
+          addReplica(key);
+          int projected = overMinimum(key);
+          removeReplica(key);
+          return projected;
+        } else {
+          return Integer.max(0, overMinimum + replicaDelta);
         }
-        // The ordering on the number of cores is the natural order.
-        return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
       }
-    }
-
-    static class SpreadDomainComparator implements Comparator<SpreadDomainWithNodes> {
-      private final Map<String, Integer> spreadDomainUsage;
 
-      SpreadDomainComparator(Map<String, Integer> spreadDomainUsage) {
-        this.spreadDomainUsage = spreadDomainUsage;
+      /**
+       * Add a replica for the given spread key, returning whether a full resorting is needed for
+       * AffinityNodes. Resorting is only needed if other nodes could possibly have a lower weight
+       * than before.
+       *
+       * @param key the spread key for the replica that should be added
+       * @return whether a re-sort is required
+       */
+      boolean addReplica(String key) {
+        int previous = spread.getOrDefault(key, 0);
+        spread.put(key, previous + 1);
+        if (allKeys.size() > 0
+            && spread.size() == allKeys.size()
+            && previous == minReplicasLocated) {
+          minReplicasLocated = spread.values().stream().mapToInt(Integer::intValue).min().orElse(0);
+          return true;
+        }
+        return false;
       }
 
-      @Override
-      public int compare(SpreadDomainWithNodes group1, SpreadDomainWithNodes group2) {
-        // This comparator will compare groups by:
-        // 1. The number of usages for the domain they represent: We want groups that are
-        // less used to be the best ones
-        // 2. On equal number of usages, by the internal comparator (which uses core count and disk
-        // space) on the best node for each group (which, since the list is sorted, it's always the
-        // one in the position 0)
-        Integer usagesLabel1 = spreadDomainUsage.getOrDefault(group1.spreadDomainName, 0);
-        Integer usagesLabel2 = spreadDomainUsage.getOrDefault(group2.spreadDomainName, 0);
-        if (usagesLabel1.equals(usagesLabel2)) {
-          return group1.compareTo(group2);
+      void removeReplica(String key) {
+        Integer replicasLocated = spread.computeIfPresent(key, (k, v) -> v - 1 == 0 ? null : v - 1);
+        if (replicasLocated == null) {
+          replicasLocated = 0;
+        }
+        if (replicasLocated < minReplicasLocated) {
+          minReplicasLocated = replicasLocated;
         }
-        return usagesLabel1.compareTo(usagesLabel2);
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
index 298c0d3687e..e00129094c5 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
@@ -17,16 +17,9 @@
 
 package org.apache.solr.cluster.placement.plugins;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
 import org.apache.solr.cluster.SolrCollection;
@@ -34,14 +27,9 @@ import org.apache.solr.cluster.placement.AttributeFetcher;
 import org.apache.solr.cluster.placement.AttributeValues;
 import org.apache.solr.cluster.placement.PlacementContext;
 import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
 import org.apache.solr.cluster.placement.PlacementPlugin;
 import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
 import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
-import org.apache.solr.common.util.SuppressForbidden;
 
 /**
  * Factory for creating {@link MinimizeCoresPlacementPlugin}, a Placement plugin implementing
@@ -49,6 +37,8 @@ import org.apache.solr.common.util.SuppressForbidden;
  * the same shard on the same node. This code is meant as an educational example of a placement
  * plugin.
  *
+ * <p>See {@link NodeWithCoreCount} for information on how this PlacementFactory weights nodes.
+ *
  * <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
  */
 public class MinimizeCoresPlacementFactory
@@ -59,122 +49,69 @@ public class MinimizeCoresPlacementFactory
     return new MinimizeCoresPlacementPlugin();
   }
 
-  private static class MinimizeCoresPlacementPlugin implements PlacementPlugin {
+  private static class MinimizeCoresPlacementPlugin extends OrderedNodePlacementPlugin {
 
     @Override
-    @SuppressForbidden(
-        reason =
-            "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
-    public List<PlacementPlan> computePlacements(
-        Collection<PlacementRequest> requests, PlacementContext placementContext)
+    protected Map<Node, WeightedNode> getBaseWeightedNodes(
+        PlacementContext placementContext,
+        Set<Node> nodes,
+        Iterable<SolrCollection> relevantCollections,
+        boolean skipNodesWithErrors)
         throws PlacementException {
-      List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
-      Set<Node> allNodes = new HashSet<>();
-      for (PlacementRequest request : requests) {
-        allNodes.addAll(request.getTargetNodes());
-      }
-
       // Fetch attributes for a superset of all nodes requested amongst the placementRequests
       AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
       attributeFetcher.requestNodeMetric(NodeMetricImpl.NUM_CORES);
-      attributeFetcher.fetchFrom(allNodes);
+      attributeFetcher.fetchFrom(nodes);
       AttributeValues attrValues = attributeFetcher.fetchAttributes();
-      Map<String, Integer> coresPerNodeTotal = new HashMap<>();
-      for (Node node : allNodes) {
-        if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
+      HashMap<Node, WeightedNode> nodeMap = new HashMap<>();
+      for (Node node : nodes) {
+        if (skipNodesWithErrors
+            && attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
           throw new PlacementException("Can't get number of cores in " + node);
         }
-        coresPerNodeTotal.put(
-            node.getName(), attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get());
+        nodeMap.put(
+            node,
+            new NodeWithCoreCount(
+                node, attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).orElse(0)));
       }
 
-      for (PlacementRequest request : requests) {
-        int totalReplicasPerShard = 0;
-        for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-          totalReplicasPerShard += request.getCountReplicasToCreate(rt);
-        }
-
-        if (request.getTargetNodes().size() < totalReplicasPerShard) {
-          throw new PlacementException("Cluster size too small for number of replicas per shard");
-        }
-
-        // Get number of cores on each Node
-        Map<Integer, Set<Node>> nodesByCores = new TreeMap<>(Comparator.naturalOrder());
-
-        Set<Node> nodes = request.getTargetNodes();
-
-        // Get the number of cores on each node and sort the nodes by increasing number of cores
-        for (Node node : nodes) {
-          nodesByCores
-              .computeIfAbsent(coresPerNodeTotal.get(node.getName()), k -> new HashSet<>())
-              .add(node);
-        }
-
-        Set<ReplicaPlacement> replicaPlacements =
-            new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
-
-        // Now place all replicas of all shards on nodes, by placing on nodes with the smallest
-        // number of cores and taking into account replicas placed during this computation. Note
-        // that for each shard we must place replicas on different nodes, when moving to the next
-        // shard we use the nodes sorted by their updated number of cores (due to replica placements
-        // for previous shards).
-        for (String shardName : request.getShardNames()) {
-          // Assign replicas based on the sort order of the nodesByCores tree multimap to put
-          // replicas on nodes with fewer cores first. We only need totalReplicasPerShard nodes
-          // given that's the number of replicas to place. We assign based on the passed
-          // nodeEntriesToAssign list so the right nodes get replicas.
-          List<Map.Entry<Integer, Node>> nodeEntriesToAssign =
-              nodesByCores.entrySet().stream()
-                  .flatMap(e -> e.getValue().stream().map(n -> Map.entry(e.getKey(), n)))
-                  .limit(totalReplicasPerShard)
-                  .collect(Collectors.toList());
+      return nodeMap;
+    }
+  }
 
-          // Update the number of cores each node will have once the assignments below got
-          // executed so the next shard picks the lowest loaded nodes for its replicas.
-          for (Map.Entry<Integer, Node> e : nodeEntriesToAssign) {
-            int coreCount = e.getKey();
-            Node node = e.getValue();
-            nodesByCores.getOrDefault(coreCount, new HashSet<>()).remove(node);
-            nodesByCores.computeIfAbsent(coreCount + 1, k -> new HashSet<>()).add(node);
-            coresPerNodeTotal.put(node.getName(), coreCount + 1);
-          }
+  /**
+   * This implementation weights nodes according to how many cores they contain. The weight of a
+   * node is just the count of cores on that node.
+   *
+   * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+   */
+  private static class NodeWithCoreCount extends OrderedNodePlacementPlugin.WeightedNode {
+    private int coreCount;
+
+    public NodeWithCoreCount(Node node, int coreCount) {
+      super(node);
+      this.coreCount = coreCount;
+    }
 
-          for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-            placeReplicas(
-                request.getCollection(),
-                nodeEntriesToAssign,
-                placementContext.getPlacementPlanFactory(),
-                replicaPlacements,
-                shardName,
-                request,
-                replicaType);
-          }
-        }
+    @Override
+    public int calcWeight() {
+      return coreCount;
+    }
 
-        placementPlans.add(
-            placementContext
-                .getPlacementPlanFactory()
-                .createPlacementPlan(request, replicaPlacements));
-      }
-      return placementPlans;
+    @Override
+    public int calcRelevantWeightWithReplica(Replica replica) {
+      return coreCount + 1;
     }
 
-    private void placeReplicas(
-        SolrCollection solrCollection,
-        List<Map.Entry<Integer, Node>> nodeEntriesToAssign,
-        PlacementPlanFactory placementPlanFactory,
-        Set<ReplicaPlacement> replicaPlacements,
-        String shardName,
-        PlacementRequest request,
-        Replica.ReplicaType replicaType) {
-      for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
-        final Map.Entry<Integer, Node> entry = nodeEntriesToAssign.remove(0);
-        final Node node = entry.getValue();
+    @Override
+    public boolean addProjectedReplicaWeights(Replica replica) {
+      coreCount += 1;
+      return false;
+    }
 
-        replicaPlacements.add(
-            placementPlanFactory.createReplicaPlacement(
-                solrCollection, shardName, node, replicaType));
-      }
+    @Override
+    public void removeProjectedReplicaWeights(Replica replica) {
+      coreCount -= 1;
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
new file mode 100644
index 00000000000..a9d1f4ea048
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalanceRequest;
+import org.apache.solr.cluster.placement.DeleteCollectionRequest;
+import org.apache.solr.cluster.placement.DeleteReplicasRequest;
+import org.apache.solr.cluster.placement.DeleteShardsRequest;
+import org.apache.solr.cluster.placement.ModificationRequest;
+import org.apache.solr.cluster.placement.PlacementContext;
+import org.apache.solr.cluster.placement.PlacementException;
+import org.apache.solr.cluster.placement.PlacementModificationException;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.PlacementRequest;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.common.util.CollectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public List<PlacementPlan> computePlacements(
+      Collection<PlacementRequest> requests, PlacementContext placementContext)
+      throws PlacementException {
+    List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
+    Set<Node> allNodes = new HashSet<>();
+    Set<SolrCollection> allCollections = new HashSet<>();
+    for (PlacementRequest request : requests) {
+      allNodes.addAll(request.getTargetNodes());
+      allCollections.add(request.getCollection());
+    }
+    Collection<WeightedNode> weightedNodes =
+        getWeightedNodes(placementContext, allNodes, allCollections, true).values();
+    for (PlacementRequest request : requests) {
+      int totalReplicasPerShard = 0;
+      for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+        totalReplicasPerShard += request.getCountReplicasToCreate(rt);
+      }
+
+      List<WeightedNode> nodesForRequest =
+          weightedNodes.stream()
+              .filter(wn -> request.getTargetNodes().contains(wn.getNode()))
+              .collect(Collectors.toList());
+
+      Set<ReplicaPlacement> replicaPlacements =
+          CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
+
+      SolrCollection solrCollection = request.getCollection();
+      // Now place randomly all replicas of all shards on available nodes
+      for (String shardName : request.getShardNames()) {
+        for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+          int replicaCount = request.getCountReplicasToCreate(replicaType);
+          if (replicaCount == 0) {
+            continue;
+          }
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Placing {} replicas for Collection: {}, Shard: {}, ReplicaType: {}",
+                replicaCount,
+                solrCollection.getName(),
+                shardName,
+                replicaType);
+          }
+          Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null);
+          PriorityQueue<WeightedNode> nodesForReplicaType = new PriorityQueue<>();
+          nodesForRequest.stream()
+              .filter(n -> n.canAddReplica(pr))
+              .forEach(
+                  n -> {
+                    n.sortByRelevantWeightWithReplica(pr);
+                    n.addToSortedCollection(nodesForReplicaType);
+                  });
+
+          int replicasPlaced = 0;
+          while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) {
+            WeightedNode node = nodesForReplicaType.poll();
+            if (!node.canAddReplica(pr)) {
+              if (log.isDebugEnabled()) {
+                log.debug(
+                    "Node can no longer accept replica, removing from selection list: {}",
+                    node.getNode());
+              }
+              continue;
+            }
+            if (node.hasWeightChangedSinceSort()) {
+              if (log.isDebugEnabled()) {
+                log.debug(
+                    "Node's sort is out-of-date, adding back to selection list: {}",
+                    node.getNode());
+              }
+              node.addToSortedCollection(nodesForReplicaType);
+              // The node will be re-sorted,
+              // so go back to the top of the loop to get the new lowest-sorted node
+              continue;
+            }
+            if (log.isDebugEnabled()) {
+              log.debug("Node chosen to host replica: {}", node.getNode());
+            }
+
+            boolean needsToResortAll =
+                node.addReplica(
+                    createProjectedReplica(solrCollection, shardName, replicaType, node.getNode()));
+            replicasPlaced += 1;
+            replicaPlacements.add(
+                placementContext
+                    .getPlacementPlanFactory()
+                    .createReplicaPlacement(
+                        solrCollection, shardName, node.getNode(), replicaType));
+            // Only update the priorityQueue if there are still replicas to be placed
+            if (replicasPlaced < replicaCount) {
+              if (needsToResortAll) {
+                if (log.isDebugEnabled()) {
+                  log.debug("Replica addition requires re-sorting of entire selection list");
+                }
+                List<WeightedNode> nodeList = new ArrayList<>(nodesForReplicaType);
+                nodesForReplicaType.clear();
+                nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType));
+              }
+              // Add the chosen node back to the list if it can accept another replica of the
+              // shard/replicaType.
+              // The default implementation of "canAddReplica()" returns false for replicas
+              // of shards that the node already contains, so this will usually be false.
+              if (node.canAddReplica(pr)) {
+                nodesForReplicaType.add(node);
+              }
+            }
+          }
+
+          if (replicasPlaced < replicaCount) {
+            throw new PlacementException(
+                String.format(
+                    Locale.ROOT,
+                    "Not enough eligible nodes to place %d replica(s) of type %s for shard %s of collection %s. Only able to place %d replicas.",
+                    replicaCount,
+                    replicaType,
+                    shardName,
+                    solrCollection.getName(),
+                    replicasPlaced));
+          }
+        }
+      }
+
+      placementPlans.add(
+          placementContext
+              .getPlacementPlanFactory()
+              .createPlacementPlan(request, replicaPlacements));
+    }
+    return placementPlans;
+  }
+
+  @Override
+  public BalancePlan computeBalancing(
+      BalanceRequest balanceRequest, PlacementContext placementContext) throws PlacementException {
+    Map<Replica, Node> replicaMovements = new HashMap<>();
+    TreeSet<WeightedNode> orderedNodes = new TreeSet<>();
+    Collection<WeightedNode> weightedNodes =
+        getWeightedNodes(
+                placementContext,
+                balanceRequest.getNodes(),
+                placementContext.getCluster().collections(),
+                true)
+            .values();
+    // This is critical to store the last sort weight for this node
+    weightedNodes.forEach(
+        node -> {
+          node.sortWithoutChanges();
+          node.addToSortedCollection(orderedNodes);
+        });
+
+    // While the node with the lowest weight still has room to take a replica from the node with the
+    // highest weight, loop
+    Map<Replica, Node> newReplicaMovements = new HashMap<>();
+    ArrayList<WeightedNode> traversedHighNodes = new ArrayList<>(orderedNodes.size() - 1);
+    while (orderedNodes.size() > 1
+        && orderedNodes.first().calcWeight() < orderedNodes.last().calcWeight()) {
+      WeightedNode lowestWeight = orderedNodes.pollFirst();
+      if (lowestWeight == null) {
+        break;
+      }
+      if (lowestWeight.hasWeightChangedSinceSort()) {
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Re-sorting lowest weighted node: {}, sorting weight is out-of-date.",
+              lowestWeight.getNode().getName());
+        }
+        // Re-sort this node and go back to find the lowest weight
+        lowestWeight.addToSortedCollection(orderedNodes);
+        continue;
+      }
+      if (log.isDebugEnabled()) {
+        log.debug(
+            "Lowest weighted node: {}, weight: {}",
+            lowestWeight.getNode().getName(),
+            lowestWeight.calcWeight());
+      }
+
+      newReplicaMovements.clear();
+      // If a compatible node was found to move replicas, break and find the lowest weighted node
+      // again
+      while (newReplicaMovements.isEmpty()
+          && !orderedNodes.isEmpty()
+          && orderedNodes.last().calcWeight() > lowestWeight.calcWeight() + 1) {
+        WeightedNode highestWeight = orderedNodes.pollLast();
+        if (highestWeight == null) {
+          break;
+        }
+        if (highestWeight.hasWeightChangedSinceSort()) {
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Re-sorting highest weighted node: {}, sorting weight is out-of-date.",
+                highestWeight.getNode().getName());
+          }
+          // Re-sort this node and go back to find the highest weight
+          highestWeight.addToSortedCollection(orderedNodes);
+          continue;
+        }
+        if (log.isDebugEnabled()) {
+          log.debug(
+              "Highest weighted node: {}, weight: {}",
+              highestWeight.getNode().getName(),
+              highestWeight.calcWeight());
+        }
+
+        traversedHighNodes.add(highestWeight);
+        // select a replica from the node with the most cores to move to the node with the least
+        // cores
+        List<Replica> availableReplicasToMove =
+            highestWeight.getAllReplicasOnNode().stream()
+                .sorted(Comparator.comparing(Replica::getReplicaName))
+                .collect(Collectors.toList());
+        int combinedNodeWeights = highestWeight.calcWeight() + lowestWeight.calcWeight();
+        for (Replica r : availableReplicasToMove) {
+          // Only continue if the replica can be removed from the old node and moved to the new node
+          if (!highestWeight.canRemoveReplicas(Set.of(r)).isEmpty()
+              || !lowestWeight.canAddReplica(r)) {
+            continue;
+          }
+          lowestWeight.addReplica(r);
+          highestWeight.removeReplica(r);
+          int lowestWeightWithReplica = lowestWeight.calcWeight();
+          int highestWeightWithoutReplica = highestWeight.calcWeight();
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Replica: {}, toNode weight with replica: {}, fromNode weight without replica: {}",
+                r.getReplicaName(),
+                lowestWeightWithReplica,
+                highestWeightWithoutReplica);
+          }
+
+          // If the combined weight of both nodes is lower after the move, make the move.
+          // Otherwise, make the move if it doesn't cause the weight of the higher node to
+          // go below the weight of the lower node, because that is over-correction.
+          if (highestWeightWithoutReplica + lowestWeightWithReplica >= combinedNodeWeights
+              && highestWeightWithoutReplica < lowestWeightWithReplica) {
+            // Undo the move
+            lowestWeight.removeReplica(r);
+            highestWeight.addReplica(r);
+            continue;
+          }
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Replica Movement chosen. From: {}, To: {}, Replica: {}",
+                highestWeight.getNode().getName(),
+                lowestWeight.getNode().getName(),
+                r);
+          }
+          newReplicaMovements.put(r, lowestWeight.getNode());
+
+          // Do not go beyond here, do another loop and see if other nodes can move replicas.
+          // It might end up being the same nodes in the next loop that end up moving another
+          // replica, but that's ok.
+          break;
+        }
+      }
+      // For now we do not have any way to see if there are out-of-date notes in the middle of the
+      // TreeSet. Therefore, we need to re-sort this list after every selection. In the future, we
+      // should find a way to re-sort the out-of-date nodes without having to sort all nodes.
+      traversedHighNodes.addAll(orderedNodes);
+      orderedNodes.clear();
+
+      // Add back in the traversed highNodes that we did not select replicas from,
+      // they might have replicas to move to the next lowestWeighted node
+      traversedHighNodes.forEach(n -> n.addToSortedCollection(orderedNodes));
+      traversedHighNodes.clear();
+      if (newReplicaMovements.size() > 0) {
+        replicaMovements.putAll(newReplicaMovements);
+        // There are no replicas to move to the lowestWeight, remove it from our loop
+        lowestWeight.addToSortedCollection(orderedNodes);
+      }
+    }
+
+    return placementContext
+        .getBalancePlanFactory()
+        .createBalancePlan(balanceRequest, replicaMovements);
+  }
+
+  protected Map<Node, WeightedNode> getWeightedNodes(
+      PlacementContext placementContext,
+      Set<Node> nodes,
+      Iterable<SolrCollection> relevantCollections,
+      boolean skipNodesWithErrors)
+      throws PlacementException {
+    Map<Node, WeightedNode> weightedNodes =
+        getBaseWeightedNodes(placementContext, nodes, relevantCollections, skipNodesWithErrors);
+
+    for (SolrCollection collection : placementContext.getCluster().collections()) {
+      for (Shard shard : collection.shards()) {
+        for (Replica replica : shard.replicas()) {
+          WeightedNode weightedNode = weightedNodes.get(replica.getNode());
+          if (weightedNode != null) {
+            weightedNode.initReplica(replica);
+          }
+        }
+      }
+    }
+
+    return weightedNodes;
+  }
+
+  protected abstract Map<Node, WeightedNode> getBaseWeightedNodes(
+      PlacementContext placementContext,
+      Set<Node> nodes,
+      Iterable<SolrCollection> relevantCollections,
+      boolean skipNodesWithErrors)
+      throws PlacementException;
+
+  @Override
+  public void verifyAllowedModification(
+      ModificationRequest modificationRequest, PlacementContext placementContext)
+      throws PlacementException {
+    if (modificationRequest instanceof DeleteShardsRequest) {
+      log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
+    } else if (modificationRequest instanceof DeleteCollectionRequest) {
+      verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
+    } else if (modificationRequest instanceof DeleteReplicasRequest) {
+      verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
+    } else {
+      log.warn("unsupported request type, skipping: {}", modificationRequest);
+    }
+  }
+
+  protected void verifyDeleteCollection(
+      DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext)
+      throws PlacementException {
+    // NO-OP
+  }
+
+  protected void verifyDeleteReplicas(
+      DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext)
+      throws PlacementException {
+    Map<Node, List<Replica>> nodesRepresented =
+        deleteReplicasRequest.getReplicas().stream()
+            .collect(Collectors.groupingBy(Replica::getNode));
+
+    Map<Node, WeightedNode> weightedNodes =
+        getWeightedNodes(
+            placementContext,
+            nodesRepresented.keySet(),
+            placementContext.getCluster().collections(),
+            false);
+
+    PlacementModificationException placementModificationException =
+        new PlacementModificationException("delete replica(s) rejected");
+    for (Map.Entry<Node, List<Replica>> entry : nodesRepresented.entrySet()) {
+      WeightedNode node = weightedNodes.get(entry.getKey());
+      if (node == null) {
+        entry
+            .getValue()
+            .forEach(
+                replica ->
+                    placementModificationException.addRejectedModification(
+                        replica.toString(),
+                        "could not load information for node: " + entry.getKey().getName()));
+      } else {
+        node.canRemoveReplicas(entry.getValue())
+            .forEach(
+                (replica, reason) ->
+                    placementModificationException.addRejectedModification(
+                        replica.toString(), reason));
+      }
+    }
+    if (!placementModificationException.getRejectedModifications().isEmpty()) {
+      throw placementModificationException;
+    }
+  }
+
+  /**
+   * A class that determines the weight of a given node and the replicas that reside on it.
+   *
+   * <p>The {@link OrderedNodePlacementPlugin} uses the weights determined here to place and balance
+   * replicas across the cluster. Replicas will be placed onto WeightedNodes with lower weights, and
+   * be taken off of WeightedNodes with higher weights.
+   *
+   * @lucene.experimental
+   */
+  public abstract static class WeightedNode implements Comparable<WeightedNode> {
+    private final Node node;
+    private final Map<String, Map<String, Set<Replica>>> replicas;
+    private IntSupplier sortWeightCalculator;
+    private int lastSortedWeight;
+
+    public WeightedNode(Node node) {
+      this.node = node;
+      this.replicas = new HashMap<>();
+      this.lastSortedWeight = 0;
+      this.sortWeightCalculator = this::calcWeight;
+    }
+
+    public void sortByRelevantWeightWithReplica(Replica replica) {
+      sortWeightCalculator = () -> calcRelevantWeightWithReplica(replica);
+    }
+
+    public void sortWithoutChanges() {
+      sortWeightCalculator = this::calcWeight;
+    }
+
+    public Node getNode() {
+      return node;
+    }
+
+    public Set<Replica> getAllReplicasOnNode() {
+      return replicas.values().stream()
+          .flatMap(shard -> shard.values().stream())
+          .flatMap(Collection::stream)
+          .collect(Collectors.toSet());
+    }
+
+    public Set<String> getCollectionsOnNode() {
+      return replicas.keySet();
+    }
+
+    public boolean hasCollectionOnNode(String collection) {
+      return replicas.containsKey(collection);
+    }
+
+    public Set<String> getShardsOnNode(String collection) {
+      return replicas.getOrDefault(collection, Collections.emptyMap()).keySet();
+    }
+
+    public boolean hasShardOnNode(Shard shard) {
+      return replicas
+          .getOrDefault(shard.getCollection().getName(), Collections.emptyMap())
+          .containsKey(shard.getShardName());
+    }
+
+    public Set<Replica> getReplicasForShardOnNode(Shard shard) {
+      return Optional.ofNullable(replicas.get(shard.getCollection().getName()))
+          .map(m -> m.get(shard.getShardName()))
+          .orElseGet(Collections::emptySet);
+    }
+
+    public void addToSortedCollection(Collection<WeightedNode> collection) {
+      stashSortedWeight();
+      collection.add(this);
+    }
+
+    public abstract int calcWeight();
+
+    public abstract int calcRelevantWeightWithReplica(Replica replica);
+
+    public boolean canAddReplica(Replica replica) {
+      // By default, do not allow two replicas of the same shard on a node
+      return getReplicasForShardOnNode(replica.getShard()).isEmpty();
+    }
+
+    private boolean addReplicaToInternalState(Replica replica) {
+      return replicas
+          .computeIfAbsent(replica.getShard().getCollection().getName(), k -> new HashMap<>())
+          .computeIfAbsent(replica.getShard().getShardName(), k -> CollectionUtil.newHashSet(1))
+          .add(replica);
+    }
+
+    public final void initReplica(Replica replica) {
+      if (addReplicaToInternalState(replica)) {
+        initReplicaWeights(replica);
+      }
+    }
+
+    protected void initReplicaWeights(Replica replica) {
+      // Defaults to a NO-OP
+    }
+
+    public final boolean addReplica(Replica replica) {
+      if (addReplicaToInternalState(replica)) {
+        return addProjectedReplicaWeights(replica);
+      } else {
+        return false;
+      }
+    }
+
+    /**
+     * Add the weights for the given replica to this node
+     *
+     * @param replica the replica to add weights for
+     * @return a whether the ordered list of nodes needs a resort afterwords.
+     */
+    protected abstract boolean addProjectedReplicaWeights(Replica replica);
+
+    /**
+     * Determine if the given replicas can be removed from the node.
+     *
+     * @param replicas the replicas to remove
+     * @return a mapping from replicas that cannot be removed to the reason why they can't be
+     *     removed.
+     */
+    public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
+      return Collections.emptyMap();
+    }
+
+    public final void removeReplica(Replica replica) {
+      // Only remove the projected replicaWeight if the node has this replica
+      AtomicBoolean hasReplica = new AtomicBoolean(false);
+      replicas.computeIfPresent(
+          replica.getShard().getCollection().getName(),
+          (col, shardReps) -> {
+            shardReps.computeIfPresent(
+                replica.getShard().getShardName(),
+                (shard, reps) -> {
+                  if (reps.remove(replica)) {
+                    hasReplica.set(true);
+                  }
+                  return reps.isEmpty() ? null : reps;
+                });
+            return shardReps.isEmpty() ? null : shardReps;
+          });
+      if (hasReplica.get()) {
+        removeProjectedReplicaWeights(replica);
+      }
+    }
+
+    protected abstract void removeProjectedReplicaWeights(Replica replica);
+
+    private void stashSortedWeight() {
+      lastSortedWeight = sortWeightCalculator.getAsInt();
+    }
+
+    protected boolean hasWeightChangedSinceSort() {
+      return lastSortedWeight != sortWeightCalculator.getAsInt();
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    protected Comparable getTiebreaker() {
+      return node.getName();
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compareTo(WeightedNode o) {
+      int comp = Integer.compare(this.lastSortedWeight, o.lastSortedWeight);
+      if (comp == 0 && !equals(o)) {
+        // TreeSets do not like a 0 comp for non-equal members.
+        comp = getTiebreaker().compareTo(o.getTiebreaker());
+      }
+      return comp;
+    }
+
+    @Override
+    public int hashCode() {
+      return node.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof WeightedNode)) {
+        return false;
+      } else {
+        WeightedNode on = (WeightedNode) o;
+        if (this.node == null) {
+          return on.node == null;
+        } else {
+          return this.node.equals(on.node);
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a fake replica to be used when computing placements for new Replicas. The new replicas
+   * need to be added to the projected state, even though they don't exist.
+   *
+   * @param collection the existing collection that the replica will be created for
+   * @param shardName the name of the new replica's shard
+   * @param type the ReplicaType for the new replica
+   * @param node the Solr node on which the replica will be placed
+   * @return a fake replica to use until the new replica is created
+   */
+  static Replica createProjectedReplica(
+      final SolrCollection collection,
+      final String shardName,
+      final Replica.ReplicaType type,
+      final Node node) {
+    final Shard shard =
+        new Shard() {
+          @Override
+          public String getShardName() {
+            return shardName;
+          }
+
+          @Override
+          public SolrCollection getCollection() {
+            return collection;
+          }
+
+          @Override
+          public Replica getReplica(String name) {
+            return null;
+          }
+
+          @Override
+          public Iterator<Replica> iterator() {
+            return null;
+          }
+
+          @Override
+          public Iterable<Replica> replicas() {
+            return null;
+          }
+
+          @Override
+          public Replica getLeader() {
+            return null;
+          }
+
+          @Override
+          public ShardState getState() {
+            return null;
+          }
+        };
+    return new Replica() {
+      @Override
+      public Shard getShard() {
+        return shard;
+      }
+
+      @Override
+      public ReplicaType getType() {
+        return type;
+      }
+
+      @Override
+      public ReplicaState getState() {
+        return ReplicaState.DOWN;
+      }
+
+      @Override
+      public String getReplicaName() {
+        return "";
+      }
+
+      @Override
+      public String getCoreName() {
+        return "";
+      }
+
+      @Override
+      public Node getNode() {
+        return node;
+      }
+    };
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
index 47f76a03a65..1e0f6a2f5ba 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
@@ -17,30 +17,25 @@
 
 package org.apache.solr.cluster.placement.plugins;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
 import org.apache.solr.cluster.SolrCollection;
 import org.apache.solr.cluster.placement.PlacementContext;
-import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
 import org.apache.solr.cluster.placement.PlacementPlugin;
 import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
-import org.apache.solr.common.util.CollectionUtil;
 
 /**
  * Factory for creating {@link RandomPlacementPlugin}, a placement plugin implementing random
  * placement for new collection creation while preventing two replicas of same shard from being
  * placed on same node..
  *
+ * <p>See {@link RandomNode} for information on how this PlacementFactory weights nodes.
+ *
  * <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
  */
 public class RandomPlacementFactory
@@ -51,7 +46,7 @@ public class RandomPlacementFactory
     return new RandomPlacementPlugin();
   }
 
-  public static class RandomPlacementPlugin implements PlacementPlugin {
+  public static class RandomPlacementPlugin extends OrderedNodePlacementPlugin {
     private final Random replicaPlacementRandom =
         new Random(); // ok even if random sequence is predictable.
 
@@ -64,65 +59,69 @@ public class RandomPlacementFactory
     }
 
     @Override
-    public List<PlacementPlan> computePlacements(
-        Collection<PlacementRequest> requests, PlacementContext placementContext)
-        throws PlacementException {
-      List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
-      for (PlacementRequest request : requests) {
-        int totalReplicasPerShard = 0;
-        for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-          totalReplicasPerShard += request.getCountReplicasToCreate(rt);
-        }
-
-        if (request.getTargetNodes().size() < totalReplicasPerShard) {
-          throw new PlacementException("Cluster size too small for number of replicas per shard");
-        }
-
-        Set<ReplicaPlacement> replicaPlacements =
-            CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
-
-        // Now place randomly all replicas of all shards on available nodes
-        for (String shardName : request.getShardNames()) {
-          // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct
-          // yet random nodes
-          ArrayList<Node> nodesToAssign = new ArrayList<>(request.getTargetNodes());
-          Collections.shuffle(nodesToAssign, replicaPlacementRandom);
-
-          for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-            placeForReplicaType(
-                request.getCollection(),
-                nodesToAssign,
-                placementContext.getPlacementPlanFactory(),
-                replicaPlacements,
-                shardName,
-                request,
-                rt);
-          }
-        }
-
-        placementPlans.add(
-            placementContext
-                .getPlacementPlanFactory()
-                .createPlacementPlan(request, replicaPlacements));
+    protected Map<Node, WeightedNode> getBaseWeightedNodes(
+        PlacementContext placementContext,
+        Set<Node> nodes,
+        Iterable<SolrCollection> relevantCollections,
+        boolean skipNodesWithErrors) {
+      HashMap<Node, WeightedNode> nodeMap = new HashMap<>();
+
+      for (Node node : nodes) {
+        nodeMap.put(node, new RandomNode(node, replicaPlacementRandom));
       }
-      return placementPlans;
+
+      return nodeMap;
     }
+  }
 
-    private void placeForReplicaType(
-        SolrCollection solrCollection,
-        ArrayList<Node> nodesToAssign,
-        PlacementPlanFactory placementPlanFactory,
-        Set<ReplicaPlacement> replicaPlacements,
-        String shardName,
-        PlacementRequest request,
-        Replica.ReplicaType replicaType) {
-      for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
-        Node node = nodesToAssign.remove(0);
-
-        replicaPlacements.add(
-            placementPlanFactory.createReplicaPlacement(
-                solrCollection, shardName, node, replicaType));
-      }
+  /**
+   * This implementation weights nodes equally. When trying to determine which nodes should be
+   * chosen to host replicas, a random sorting is used.
+   *
+   * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+   */
+  private static class RandomNode extends OrderedNodePlacementPlugin.WeightedNode {
+    private final Random random;
+    private int randomTiebreaker;
+
+    public RandomNode(Node node, Random random) {
+      super(node);
+      this.random = random;
+      this.randomTiebreaker = random.nextInt();
+    }
+
+    @Override
+    public int calcWeight() {
+      return 0;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes"})
+    public Comparable getTiebreaker() {
+      return randomTiebreaker;
+    }
+
+    @Override
+    public int calcRelevantWeightWithReplica(Replica replica) {
+      return calcWeight();
+    }
+
+    @Override
+    protected boolean addProjectedReplicaWeights(Replica replica) {
+      // NO-OP
+      return false;
+    }
+
+    @Override
+    protected void removeProjectedReplicaWeights(Replica replica) {
+      // NO-OP
+    }
+
+    @Override
+    public void addToSortedCollection(
+        Collection<OrderedNodePlacementPlugin.WeightedNode> collection) {
+      randomTiebreaker = random.nextInt();
+      super.addToSortedCollection(collection);
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
index 3242d25aa6c..52f30c367b0 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
@@ -17,32 +17,24 @@
 
 package org.apache.solr.cluster.placement.plugins;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.Shard;
 import org.apache.solr.cluster.SolrCollection;
 import org.apache.solr.cluster.placement.PlacementContext;
-import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
 import org.apache.solr.cluster.placement.PlacementPlugin;
 import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
-import org.apache.solr.common.util.CollectionUtil;
 
 /**
  * Factory for creating {@link SimplePlacementPlugin}, a placement plugin implementing the logic
  * from the old <code>LegacyAssignStrategy</code>. This chooses nodes with the fewest cores
  * (especially cores of the same collection).
  *
+ * <p>See {@link SameCollWeightedNode} for information on how this PlacementFactory weights nodes.
+ *
  * <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
  */
 public class SimplePlacementFactory
@@ -53,118 +45,140 @@ public class SimplePlacementFactory
     return new SimplePlacementPlugin();
   }
 
-  public static class SimplePlacementPlugin implements PlacementPlugin {
-    @Override
-    public List<PlacementPlan> computePlacements(
-        Collection<PlacementRequest> requests, PlacementContext placementContext)
-        throws PlacementException {
-      List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
-      Map<Node, ReplicaCount> nodeVsShardCount = getNodeVsShardCount(placementContext);
-      for (PlacementRequest request : requests) {
-        int totalReplicasPerShard = 0;
-        for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-          totalReplicasPerShard += request.getCountReplicasToCreate(rt);
-        }
-
-        Set<ReplicaPlacement> replicaPlacements =
-            CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
-
-        Collection<ReplicaCount> replicaCounts = nodeVsShardCount.values();
-
-        if (request.getTargetNodes().size() < replicaCounts.size()) {
-          replicaCounts =
-              replicaCounts.stream()
-                  .filter(rc -> request.getTargetNodes().contains(rc.node()))
-                  .collect(Collectors.toList());
-        }
-
-        for (String shard : request.getShardNames()) {
-          // Reset the ordering of the nodes for each shard, using the replicas added in the
-          // previous shards and assign requests
-          List<Node> nodeList =
-              replicaCounts.stream()
-                  .sorted(
-                      Comparator.<ReplicaCount>comparingInt(
-                              rc -> rc.weight(request.getCollection().getName()))
-                          .thenComparing(ReplicaCount::nodeName))
-                  .map(ReplicaCount::node)
-                  .collect(Collectors.toList());
-          int replicaNumOfShard = 0;
-          for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-            for (int i = 0; i < request.getCountReplicasToCreate(replicaType); i++) {
-              Node assignedNode = nodeList.get(replicaNumOfShard++ % nodeList.size());
-
-              replicaPlacements.add(
-                  placementContext
-                      .getPlacementPlanFactory()
-                      .createReplicaPlacement(
-                          request.getCollection(), shard, assignedNode, replicaType));
-
-              ReplicaCount replicaCount =
-                  nodeVsShardCount.computeIfAbsent(assignedNode, ReplicaCount::new);
-              replicaCount.totalReplicas++;
-              replicaCount.collectionReplicas.merge(
-                  request.getCollection().getName(), 1, Integer::sum);
-            }
-          }
-        }
-
-        placementPlans.add(
-            placementContext
-                .getPlacementPlanFactory()
-                .createPlacementPlan(request, replicaPlacements));
-      }
-      return placementPlans;
-    }
-
-    private Map<Node, ReplicaCount> getNodeVsShardCount(PlacementContext placementContext) {
-      HashMap<Node, ReplicaCount> nodeVsShardCount = new HashMap<>();
+  public static class SimplePlacementPlugin extends OrderedNodePlacementPlugin {
 
-      for (Node s : placementContext.getCluster().getLiveDataNodes()) {
-        nodeVsShardCount.computeIfAbsent(s, ReplicaCount::new);
+    @Override
+    protected Map<Node, WeightedNode> getBaseWeightedNodes(
+        PlacementContext placementContext,
+        Set<Node> nodes,
+        Iterable<SolrCollection> relevantCollections,
+        boolean skipNodesWithErrors) {
+      HashMap<Node, WeightedNode> nodeVsShardCount = new HashMap<>();
+
+      for (Node n : nodes) {
+        nodeVsShardCount.computeIfAbsent(n, SameCollWeightedNode::new);
       }
 
-      // if we get here we were not given a createNodeList, build a map with real counts.
-      for (SolrCollection collection : placementContext.getCluster().collections()) {
-        // identify suitable nodes  by checking the no:of cores in each of them
-        for (Shard shard : collection.shards()) {
-          for (Replica replica : shard.replicas()) {
-            ReplicaCount count = nodeVsShardCount.get(replica.getNode());
-            if (count != null) {
-              count.addReplica(collection.getName(), shard.getShardName());
-            }
-          }
-        }
-      }
       return nodeVsShardCount;
     }
   }
 
-  static class ReplicaCount {
-    public final Node node;
+  /**
+   * This implementation weights nodes according to how many replicas of the same collection and
+   * shard reside on the node. The implementation tries to spread replicas of the same
+   * collection/shard across different nodes, so nodes that contain more of the same
+   * collection/shard will be weighted higher than nodes that only contain replicas for unique
+   * collections/shards.
+   *
+   * <p>The total weight of the SameCollWeightedNode is the sum of:
+   *
+   * <ul>
+   *   <li>The number of replicas on the node
+   *   <li>5 * for each collection, the sum of:
+   *       <ul>
+   *         <li>(the number of replicas for that collection - 1)^2
+   *       </ul>
+   *   <li>1000 * for each shard, the sum of:
+   *       <ul>
+   *         <li>(the number of replicas for that shard - 1)^2
+   *       </ul>
+   * </ul>
+   *
+   * The count of overlapping replicas for collections/shards must be squared, since we want higher
+   * values to be penalized more than lower values. If a node has 2 collections with 3 replicas
+   * each, it should be weighted less than a node with 1 collection that has 5 replicas placed
+   * there. Without squaring, the weight for the first node would be 26, and the weight of the
+   * second node would be 25. So node #2 would be weighted lower even though it is considered to be
+   * violating the constraints more. When we square the overlapping replica counts, the weight of
+   * the first node would be 46 and the weight of the second node would be 85. This is the preferred
+   * order.
+   *
+   * <p>The "relevant" weight with a replica is the sum of:
+   *
+   * <ul>
+   *   <li>The number of replicas on the node
+   *   <li>5 * (the number of replicas on the node for that replica's collection - 1)
+   *   <li>1000 * (the number of replicas on the node for that replica's shard - 1)
+   * </ul>
+   *
+   * <p>Multiple replicas of the same shard are permitted to live on the same Node, but as shown
+   * above, the weight penalty for such is very high.
+   */
+  private static class SameCollWeightedNode extends OrderedNodePlacementPlugin.WeightedNode {
+    private static final int SAME_COL_MULT = 5;
+    private static final int SAME_SHARD_MULT = 1000;
     public Map<String, Integer> collectionReplicas;
-    public int totalReplicas = 0;
+    public int totalWeight = 0;
 
-    ReplicaCount(Node node) {
-      this.node = node;
+    SameCollWeightedNode(Node node) {
+      super(node);
       this.collectionReplicas = new HashMap<>();
     }
 
-    public int weight(String collection) {
-      return (collectionReplicas.getOrDefault(collection, 0) * 5) + totalReplicas;
+    @Override
+    public int calcWeight() {
+      return totalWeight;
     }
 
-    public void addReplica(String collection, String shard) {
-      // Used to "weigh" whether this node should be used later.
-      collectionReplicas.merge(collection, 1, Integer::sum);
+    @Override
+    public int calcRelevantWeightWithReplica(Replica replica) {
+      // Don't add 1 to the individual replica Counts, because 1 is subtracted from each when
+      // calculating weights.
+      // So since 1 would be added to each for the new replica, we can just use the original number
+      // to calculate the weights.
+      int colReplicaCount =
+          collectionReplicas.getOrDefault(replica.getShard().getCollection().getName(), 0);
+      int shardReplicaCount = getReplicasForShardOnNode(replica.getShard()).size();
+      return getAllReplicasOnNode().size()
+          + 1
+          + colReplicaCount * SAME_COL_MULT
+          + shardReplicaCount * SAME_SHARD_MULT;
     }
 
-    public Node node() {
-      return node;
+    @Override
+    public boolean canAddReplica(Replica replica) {
+      return true;
     }
 
-    public String nodeName() {
-      return node.getName();
+    @Override
+    protected boolean addProjectedReplicaWeights(Replica replica) {
+      int colReplicaCountWith =
+          collectionReplicas.merge(replica.getShard().getCollection().getName(), 1, Integer::sum);
+      int shardReplicaCountWith = getReplicasForShardOnNode(replica.getShard()).size();
+      totalWeight +=
+          addedWeightOfAdditionalReplica(colReplicaCountWith - 1, shardReplicaCountWith - 1);
+      return false;
+    }
+
+    @Override
+    protected void initReplicaWeights(Replica replica) {
+      addProjectedReplicaWeights(replica);
+    }
+
+    @Override
+    protected void removeProjectedReplicaWeights(Replica replica) {
+      Integer colReplicaCountWithout =
+          Optional.ofNullable(
+                  collectionReplicas.computeIfPresent(
+                      replica.getShard().getCollection().getName(), (k, v) -> v - 1))
+              .orElse(0);
+      int shardReplicaCountWithout = getReplicasForShardOnNode(replica.getShard()).size();
+      totalWeight -=
+          addedWeightOfAdditionalReplica(colReplicaCountWithout, shardReplicaCountWithout);
+    }
+
+    private int addedWeightOfAdditionalReplica(
+        int colReplicaCountWithout, int shardReplicaCountWithout) {
+      int additionalWeight = 1;
+      if (colReplicaCountWithout > 0) {
+        // x * 2 - 1 === x^2 - (x - 1)^2
+        additionalWeight += SAME_COL_MULT * (colReplicaCountWithout * 2 - 1);
+      }
+      if (shardReplicaCountWithout > 0) {
+        // x * 2 - 1 === x^2 - (x - 1)^2
+        additionalWeight += SAME_SHARD_MULT * (colReplicaCountWithout * 2 - 1);
+      }
+      return additionalWeight;
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index c1ca2a1df69..a83895e0e36 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -179,6 +179,7 @@ import org.apache.solr.handler.admin.api.AddReplicaAPI;
 import org.apache.solr.handler.admin.api.AddReplicaPropertyAPI;
 import org.apache.solr.handler.admin.api.AdminAPIBase;
 import org.apache.solr.handler.admin.api.AliasPropertyAPI;
+import org.apache.solr.handler.admin.api.BalanceReplicasAPI;
 import org.apache.solr.handler.admin.api.BalanceShardUniqueAPI;
 import org.apache.solr.handler.admin.api.CollectionPropertyAPI;
 import org.apache.solr.handler.admin.api.CollectionStatusAPI;
@@ -1534,6 +1535,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         ListCollectionsAPI.class,
         ListCollectionBackupsAPI.class,
         ReplaceNodeAPI.class,
+        BalanceReplicasAPI.class,
         RestoreCollectionAPI.class,
         CollectionPropertyAPI.class,
         DeleteNodeAPI.class,
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java
new file mode 100644
index 00000000000..0a92fcea962
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.admin.api;
+
+import static org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.params.CollectionParams.NODES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
+import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.parameters.RequestBody;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.jersey.JacksonReflectMapWriter;
+import org.apache.solr.jersey.PermissionName;
+import org.apache.solr.jersey.SolrJerseyResponse;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+/** V2 API for balancing the replicas that already exist across a set of nodes. */
+@Path("cluster/replicas/balance")
+public class BalanceReplicasAPI extends AdminAPIBase {
+
+  @Inject
+  public BalanceReplicasAPI(
+      CoreContainer coreContainer,
+      SolrQueryRequest solrQueryRequest,
+      SolrQueryResponse solrQueryResponse) {
+    super(coreContainer, solrQueryRequest, solrQueryResponse);
+  }
+
+  @POST
+  @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2})
+  @PermissionName(COLL_EDIT_PERM)
+  @Operation(summary = "Balance Replicas across the given set of Nodes.")
+  public SolrJerseyResponse balanceReplicas(
+      @RequestBody(description = "Contains user provided parameters")
+          BalanceReplicasRequestBody requestBody)
+      throws Exception {
+    final SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class);
+    final CoreContainer coreContainer = fetchAndValidateZooKeeperAwareCoreContainer();
+    // TODO Record node for log and tracing
+    final ZkNodeProps remoteMessage = createRemoteMessage(requestBody);
+    final SolrResponse remoteResponse =
+        CollectionsHandler.submitCollectionApiCommand(
+            coreContainer,
+            coreContainer.getDistributedCollectionCommandRunner(),
+            remoteMessage,
+            CollectionAction.BALANCE_REPLICAS,
+            DEFAULT_COLLECTION_OP_TIMEOUT);
+    if (remoteResponse.getException() != null) {
+      throw remoteResponse.getException();
+    }
+
+    disableResponseCaching();
+    return response;
+  }
+
+  public ZkNodeProps createRemoteMessage(BalanceReplicasRequestBody requestBody) {
+    final Map<String, Object> remoteMessage = new HashMap<>();
+    if (requestBody != null) {
+      insertIfNotNull(remoteMessage, NODES, requestBody.nodes);
+      insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
+      insertIfNotNull(remoteMessage, ASYNC, requestBody.async);
+    }
+    remoteMessage.put(QUEUE_OPERATION, CollectionAction.BALANCE_REPLICAS.toLower());
+
+    return new ZkNodeProps(remoteMessage);
+  }
+
+  public static class BalanceReplicasRequestBody implements JacksonReflectMapWriter {
+
+    public BalanceReplicasRequestBody() {}
+
+    public BalanceReplicasRequestBody(Set<String> nodes, Boolean waitForFinalState, String async) {
+      this.nodes = nodes;
+      this.waitForFinalState = waitForFinalState;
+      this.async = async;
+    }
+
+    @Schema(
+        description =
+            "The set of nodes across which replicas will be balanced. Defaults to all live data nodes.")
+    @JsonProperty(value = "nodes")
+    public Set<String> nodes;
+
+    @Schema(
+        description =
+            "If true, the request will complete only when all affected replicas become active. "
+                + "If false, the API will return the status of the single action, which may be "
+                + "before the new replica is online and active.")
+    @JsonProperty("waitForFinalState")
+    public Boolean waitForFinalState = false;
+
+    @Schema(description = "Request ID to track this action which will be processed asynchronously.")
+    @JsonProperty("async")
+    public String async;
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java
new file mode 100644
index 00000000000..82a75027e68
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.admin.api.BalanceReplicasAPI;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BalanceReplicasTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @BeforeClass
+  public static void setupCluster() {
+    System.setProperty("metricsEnabled", "true");
+  }
+
+  @Before
+  public void clearPreviousCluster() throws Exception {
+    // Clear the previous cluster before each test, since they use different numbers of nodes.
+    shutdownCluster();
+  }
+
+  @Test
+  public void testAllNodes() throws Exception {
+    configureCluster(6)
+        .addConfig(
+            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "balancereplicastest_allnodes_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    CollectionAdminRequest.Create create;
+
+    create =
+        pickRandom(
+            CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0),
+            // check also replicationFactor 1
+            CollectionAdminRequest.createCollection(coll, "conf1", 6, 1, 0, 0));
+    create.setCreateNodeSet(StrUtils.join(l.subList(0, 2), ','));
+    cloudClient.request(create);
+
+    cluster.waitForActiveCollection(
+        coll,
+        create.getNumShards(),
+        create.getNumShards()
+            * (create.getNumNrtReplicas()
+                + create.getNumPullReplicas()
+                + create.getNumTlogReplicas()));
+
+    DocCollection collection = cloudClient.getClusterState().getCollection(coll);
+    log.debug("### Before balancing: {}", collection);
+
+    postDataAndGetResponse(
+        cluster.getSolrClient(),
+        "/api/cluster/replicas/balance",
+        BalanceReplicasAPI.BalanceReplicasRequestBody.EMPTY);
+
+    collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+    log.debug("### After balancing: {}", collection);
+    Set<String> replicaNodes =
+        collection.getReplicas().stream().map(Replica::getNodeName).collect(Collectors.toSet());
+    assertEquals("Incorrect nodes for replicas after balancing", liveNodes, replicaNodes);
+  }
+
+  @Test
+  public void testSomeNodes() throws Exception {
+    configureCluster(5)
+        .addConfig(
+            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "balancereplicastest_somenodes_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    CollectionAdminRequest.Create create;
+
+    create =
+        pickRandom(
+            CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0),
+            // check also replicationFactor 1
+            CollectionAdminRequest.createCollection(coll, "conf1", 6, 1, 0, 0));
+    create.setCreateNodeSet(StrUtils.join(l.subList(0, 2), ','));
+    cloudClient.request(create);
+
+    cluster.waitForActiveCollection(
+        coll,
+        create.getNumShards(),
+        create.getNumShards()
+            * (create.getNumNrtReplicas()
+                + create.getNumPullReplicas()
+                + create.getNumTlogReplicas()));
+
+    DocCollection collection = cloudClient.getClusterState().getCollection(coll);
+    log.debug("### Before balancing: {}", collection);
+
+    postDataAndGetResponse(
+        cluster.getSolrClient(),
+        "/api/cluster/replicas/balance",
+        new BalanceReplicasAPI.BalanceReplicasRequestBody(
+            new HashSet<>(l.subList(1, 4)), true, null));
+
+    collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+    log.debug("### After balancing: {}", collection);
+    Set<String> replicaNodes =
+        collection.getReplicas().stream().map(Replica::getNodeName).collect(Collectors.toSet());
+    assertEquals("Incorrect nodes for replicas after balancing", 4, replicaNodes.size());
+    assertTrue(
+        "A non-balanced node lost replicas during balancing", replicaNodes.contains(l.get(0)));
+    assertFalse(
+        "A non-balanced node gained replicas during balancing", replicaNodes.contains(l.get(4)));
+  }
+
+  public Map<?, ?> postDataAndGetResponse(CloudSolrClient cloudClient, String uri, Object body)
+      throws IOException {
+    HttpEntityEnclosingRequestBase httpRequest = null;
+    HttpEntity entity;
+    String response = null;
+    Map<?, ?> m = null;
+
+    uri = cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + uri;
+    try {
+      httpRequest = new HttpPost(uri);
+
+      httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON));
+      httpRequest.setHeader("Accept", "application/json");
+      entity =
+          ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity();
+      try {
+        response = EntityUtils.toString(entity, UTF_8);
+        m = (Map<?, ?>) Utils.fromJSONString(response);
+      } catch (JSONParser.ParseException e) {
+        log.error("err response: {}", response);
+        throw new AssertionError(e);
+      }
+    } finally {
+      httpRequest.releaseConnection();
+    }
+    return m;
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
index fb7d769d1c9..ecbf3fb5de4 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
@@ -335,7 +335,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
               .process(cluster.getSolrClient());
       fail("should have failed: " + rsp);
     } catch (Exception e) {
-      assertTrue(e.toString(), e.toString().contains("colocated collection"));
+      assertTrue(e.toString(), e.toString().contains("collocated collection"));
     }
   }
 
@@ -383,8 +383,8 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
       fail("should have failed due to no nodes with the types: " + rsp);
     } catch (Exception e) {
       assertTrue(
-          "should contain 'no nodes with types':" + e,
-          e.toString().contains("no nodes with types"));
+          "should contain 'Not enough eligible nodes to place':" + e,
+          e.toString().contains("Not enough eligible nodes to place"));
     }
     System.setProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP, "type_0");
     CollectionAdminResponse rsp =
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java
new file mode 100644
index 00000000000..7e03b52aad1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.Builders;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+
+/** Unit test for {@link AffinityPlacementFactory} */
+public abstract class AbstractPlacementFactoryTest extends SolrTestCaseJ4 {
+
+  /**
+   * Verifies that a computed set of placements does match the expected placement on nodes.
+   *
+   * @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the
+   *     shard index, NRT the replica type and 3 the node on which the replica is placed. Shards are
+   *     1-based. Nodes 0-based.
+   *     <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
+   *     shards as defined for the collection in the call to {@code
+   *     Builders.CollectionBuilder#customCollectionSetup(List, List)}
+   * @param shardBuilders the shard builders are passed here to get the shard names by index
+   *     (1-based) rather than by parsing the shard names (which would break if we change the shard
+   *     naming scheme).
+   */
+  public static void verifyPlacements(
+      Set<String> expectedPlacements,
+      PlacementPlan placementPlan,
+      List<Builders.ShardBuilder> shardBuilders,
+      List<Node> liveNodes) {
+    Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
+
+    // Prepare structures for looking up shard name index and node index
+    Map<String, Integer> shardNumbering = new HashMap<>();
+    int index = 1; // first shard is 1 not 0
+    for (Builders.ShardBuilder sb : shardBuilders) {
+      shardNumbering.put(sb.getShardName(), index++);
+    }
+    Map<Node, Integer> nodeNumbering = new HashMap<>();
+    index = 0;
+    for (Node n : liveNodes) {
+      nodeNumbering.put(n, index++);
+    }
+
+    if (expectedPlacements.size() != computedPlacements.size()) {
+      fail(
+          "Wrong number of placements, expected "
+              + expectedPlacements.size()
+              + " computed "
+              + computedPlacements.size()
+              + ". "
+              + getExpectedVsComputedPlacement(
+                  expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+    }
+
+    Set<String> expected = new HashSet<>(expectedPlacements);
+    for (ReplicaPlacement p : computedPlacements) {
+      String lookUpPlacementResult =
+          shardNumbering.get(p.getShardName())
+              + " "
+              + p.getReplicaType().name()
+              + " "
+              + nodeNumbering.get(p.getNode());
+      if (!expected.remove(lookUpPlacementResult)) {
+        fail(
+            "Computed placement ["
+                + lookUpPlacementResult
+                + "] not expected. "
+                + getExpectedVsComputedPlacement(
+                    expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+      }
+    }
+  }
+
+  private static String getExpectedVsComputedPlacement(
+      Set<String> expectedPlacements,
+      Set<ReplicaPlacement> computedPlacements,
+      Map<String, Integer> shardNumbering,
+      Map<Node, Integer> nodeNumbering) {
+
+    StringBuilder sb = new StringBuilder("Expected placement: ");
+    for (String placement : expectedPlacements) {
+      sb.append("[").append(placement).append("] ");
+    }
+
+    sb.append("Computed placement: ");
+    for (ReplicaPlacement placement : computedPlacements) {
+      String lookUpPlacementResult =
+          shardNumbering.get(placement.getShardName())
+              + " "
+              + placement.getReplicaType().name()
+              + " "
+              + nodeNumbering.get(placement.getNode());
+
+      sb.append("[").append(lookUpPlacementResult).append("] ");
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Verifies that a computed set of placements does match the expected placement on nodes.
+   *
+   * @param expectedMovements a set of strings of the form {@code "COL 1 NRT 3 -> 4"} where COL is
+   *     the name of the collection, 1 would be the shard index, NRT the replica type and {@code 3
+   *     -> 4} represents the replica moving from the 3rd node to the 4th node. Shards are 1-based.
+   *     Nodes 0-based.
+   *     <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
+   *     shards as defined for the collection in the call to {@code
+   *     Builders.CollectionBuilder#customCollectionSetup(List, List)}
+   * @param shardBuilders the shard builders are passed here to get the shard names by index
+   *     (1-based) rather than by parsing the shard names (which would break if we change the shard
+   *     naming scheme).
+   */
+  public static void verifyBalancing(
+      Set<String> expectedMovements,
+      BalancePlan balancePlan,
+      List<Builders.ShardBuilder> shardBuilders,
+      List<Node> liveNodes) {
+    Map<Replica, Node> computedMovements = balancePlan.getReplicaMovements();
+
+    assertNotNull(
+        "Replica movements returned from balancePlan should not be null", computedMovements);
+
+    // Prepare structures for looking up shard name index and node index
+    Map<String, Integer> shardNumbering = new HashMap<>();
+    int index = 1; // first shard is 1 not 0
+    for (Builders.ShardBuilder sb : shardBuilders) {
+      shardNumbering.put(sb.getShardName(), index++);
+    }
+    Map<Node, Integer> nodeNumbering = new HashMap<>();
+    index = 0;
+    for (Node n : liveNodes) {
+      nodeNumbering.put(n, index++);
+    }
+
+    if (expectedMovements.size() != computedMovements.size()) {
+      fail(
+          "Wrong number of replica movements, expected "
+              + expectedMovements.size()
+              + " computed "
+              + computedMovements.size()
+              + ". "
+              + getExpectedVsComputedMovement(
+                  expectedMovements, computedMovements, shardNumbering, nodeNumbering));
+    }
+
+    Set<String> expected = new HashSet<>(expectedMovements);
+    for (Map.Entry<Replica, Node> movement : computedMovements.entrySet()) {
+      Replica replica = movement.getKey();
+      String lookUpMovementResult =
+          replica.getShard().getCollection().getName()
+              + " "
+              + shardNumbering.get(replica.getShard().getShardName())
+              + " "
+              + replica.getType().name()
+              + " "
+              + nodeNumbering.get(replica.getNode())
+              + " -> "
+              + nodeNumbering.get(movement.getValue());
+      if (!expected.remove(lookUpMovementResult)) {
+        fail(
+            "Computed placement ["
+                + lookUpMovementResult
+                + "] not expected. "
+                + getExpectedVsComputedMovement(
+                    expectedMovements, computedMovements, shardNumbering, nodeNumbering));
+      }
+    }
+  }
+
+  private static String getExpectedVsComputedMovement(
+      Set<String> expectedMovements,
+      Map<Replica, Node> computedMovements,
+      Map<String, Integer> shardNumbering,
+      Map<Node, Integer> nodeNumbering) {
+
+    StringBuilder sb = new StringBuilder("Expected movement: ");
+    for (String movement : expectedMovements) {
+      sb.append("[").append(movement).append("] ");
+    }
+
+    sb.append("Computed movement: ");
+    for (Map.Entry<Replica, Node> movement : computedMovements.entrySet()) {
+      Replica replica = movement.getKey();
+      String lookUpMovementResult =
+          replica.getShard().getCollection().getName()
+              + " "
+              + shardNumbering.get(replica.getShard().getShardName())
+              + " "
+              + replica.getType().name()
+              + " "
+              + nodeNumbering.get(replica.getNode())
+              + " -> "
+              + nodeNumbering.get(movement.getValue());
+
+      sb.append("[").append(lookUpMovementResult).append("] ");
+    }
+
+    return sb.toString();
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
index 7cbf0448571..c71c823e59b 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -18,8 +18,7 @@
 package org.apache.solr.cluster.placement.plugins;
 
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,20 +30,22 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
-import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cluster.Cluster;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
 import org.apache.solr.cluster.Shard;
 import org.apache.solr.cluster.SolrCollection;
 import org.apache.solr.cluster.placement.AttributeValues;
+import org.apache.solr.cluster.placement.BalancePlan;
 import org.apache.solr.cluster.placement.Builders;
 import org.apache.solr.cluster.placement.DeleteReplicasRequest;
 import org.apache.solr.cluster.placement.PlacementContext;
 import org.apache.solr.cluster.placement.PlacementException;
 import org.apache.solr.cluster.placement.PlacementPlan;
 import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.PlacementRequest;
 import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.impl.BalanceRequestImpl;
 import org.apache.solr.cluster.placement.impl.ModificationRequestImpl;
 import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
 import org.apache.solr.common.util.Pair;
@@ -55,7 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Unit test for {@link AffinityPlacementFactory} */
-public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
+public class AffinityPlacementFactoryTest extends AbstractPlacementFactoryTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private PlacementPlugin plugin;
@@ -251,6 +252,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
             List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
             List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
     collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
     SolrCollection solrCollection = collectionBuilder.build();
 
     List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -359,6 +361,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
             List.of("NRT " + AZ1_NRT_HIGHCORES, "TLOG " + AZ3_TLOGPULL), // shard 1
             List.of("TLOG " + AZ2_TLOGPULL)); // shard 2
     collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
     SolrCollection solrCollection = collectionBuilder.build();
 
     List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -426,6 +429,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
     Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
     List<List<String>> shardsReplicas = List.of(List.of());
     collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
     SolrCollection solrCollection = collectionBuilder.build();
 
     List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -489,6 +493,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
             List.of("NRT 10", "TLOG 11"), // shard 1
             List.of()); // shard 2
     collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
     SolrCollection solrCollection = collectionBuilder.build();
 
     List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -523,95 +528,6 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
     verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
   }
 
-  /**
-   * Verifies that a computed set of placements does match the expected placement on nodes.
-   *
-   * @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the
-   *     shard index, NRT the replica type and 3 the node on which the replica is placed. Shards are
-   *     1-based. Nodes 0-based.
-   *     <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
-   *     shards as defined for the collection in the call to {@link
-   *     org.apache.solr.cluster.placement.Builders.CollectionBuilder#customCollectionSetup(List,
-   *     List)}
-   * @param shardBuilders the shard builders are passed here to get the shard names by index
-   *     (1-based) rather than by parsing the shard names (which would break if we change the shard
-   *     naming scheme).
-   */
-  private static void verifyPlacements(
-      Set<String> expectedPlacements,
-      PlacementPlan placementPlan,
-      List<Builders.ShardBuilder> shardBuilders,
-      List<Node> liveNodes) {
-    Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
-
-    // Prepare structures for looking up shard name index and node index
-    Map<String, Integer> shardNumbering = new HashMap<>();
-    int index = 1; // first shard is 1 not 0
-    for (Builders.ShardBuilder sb : shardBuilders) {
-      shardNumbering.put(sb.getShardName(), index++);
-    }
-    Map<Node, Integer> nodeNumbering = new HashMap<>();
-    index = 0;
-    for (Node n : liveNodes) {
-      nodeNumbering.put(n, index++);
-    }
-
-    if (expectedPlacements.size() != computedPlacements.size()) {
-      fail(
-          "Wrong number of placements, expected "
-              + expectedPlacements.size()
-              + " computed "
-              + computedPlacements.size()
-              + ". "
-              + getExpectedVsComputedPlacement(
-                  expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
-    }
-
-    Set<String> expected = new HashSet<>(expectedPlacements);
-    for (ReplicaPlacement p : computedPlacements) {
-      String lookUpPlacementResult =
-          shardNumbering.get(p.getShardName())
-              + " "
-              + p.getReplicaType().name()
-              + " "
-              + nodeNumbering.get(p.getNode());
-      if (!expected.remove(lookUpPlacementResult)) {
-        fail(
-            "Computed placement ["
-                + lookUpPlacementResult
-                + "] not expected. "
-                + getExpectedVsComputedPlacement(
-                    expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
-      }
-    }
-  }
-
-  private static String getExpectedVsComputedPlacement(
-      Set<String> expectedPlacements,
-      Set<ReplicaPlacement> computedPlacements,
-      Map<String, Integer> shardNumbering,
-      Map<Node, Integer> nodeNumbering) {
-
-    StringBuilder sb = new StringBuilder("Expected placement: ");
-    for (String placement : expectedPlacements) {
-      sb.append("[").append(placement).append("] ");
-    }
-
-    sb.append("Computed placement: ");
-    for (ReplicaPlacement placement : computedPlacements) {
-      String lookUpPlacementResult =
-          shardNumbering.get(placement.getShardName())
-              + " "
-              + placement.getReplicaType().name()
-              + " "
-              + nodeNumbering.get(placement.getNode());
-
-      sb.append("[").append(lookUpPlacementResult).append("] ");
-    }
-
-    return sb.toString();
-  }
-
   @Test
   public void testAvailabilityZones() throws Exception {
     String collectionName = "azCollection";
@@ -811,6 +727,72 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
     }
   }
 
+  @Test
+  public void testFreeDiskConstraintsWithNewReplicas() throws Exception {
+    String collectionName = "freeDiskWithReplicasCollection";
+    int NUM_NODES = 3;
+    Builders.ClusterBuilder clusterBuilder =
+        Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
+    Node smallNode = null;
+    for (int i = 0; i < NUM_NODES; i++) {
+      Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
+      // Act as if the two replicas were placed on nodes 1 and 2
+      nodeBuilder.setCoreCount(0);
+      nodeBuilder.setFreeDiskGB(100.0);
+    }
+
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+    collectionBuilder.initializeShardsReplicas(
+        3,
+        1,
+        0,
+        0,
+        clusterBuilder.getLiveNodeBuilders(), // .subList(1, 3),
+        List.of(33, 33, 60));
+    clusterBuilder.addCollection(collectionBuilder);
+
+    PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+    Cluster cluster = placementContext.getCluster();
+
+    SolrCollection solrCollection = cluster.getCollection(collectionName);
+
+    // Test when an additional replicaType makes the projected indexSize go over the limit
+    // Add two replicas (different types) to the first shard, the second replica should fail
+    PlacementRequestImpl badReplicaPlacementRequest =
+        new PlacementRequestImpl(
+            solrCollection,
+            StreamSupport.stream(solrCollection.shards().spliterator(), false)
+                .map(Shard::getShardName)
+                .findFirst()
+                .map(Set::of)
+                .orElseGet(Collections::emptySet),
+            cluster.getLiveNodes(),
+            0,
+            1,
+            1);
+
+    assertThrows(
+        PlacementException.class,
+        () -> plugin.computePlacement(badReplicaPlacementRequest, placementContext));
+
+    // Test when an additional shard makes the projected indexSize go over the limit
+    // Add one replica to each shard, the third shard should fail
+    PlacementRequest badShardPlacementRequest =
+        new PlacementRequestImpl(
+            solrCollection,
+            StreamSupport.stream(solrCollection.shards().spliterator(), false)
+                .map(Shard::getShardName)
+                .collect(Collectors.toSet()),
+            cluster.getLiveNodes(),
+            0,
+            1,
+            1);
+
+    assertThrows(
+        PlacementException.class,
+        () -> plugin.computePlacement(badShardPlacementRequest, placementContext));
+  }
+
   @Test
   public void testWithCollectionPlacement() throws Exception {
     AffinityPlacementConfig config =
@@ -1197,11 +1179,12 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
     int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
     replicasPerNode.forEach(
         (node, count) -> {
-          assertEquals(perNode, count.get());
+          assertEquals(
+              "Wrong number of replicas for node: " + node.getName(), perNode, count.get());
         });
     shardsPerNode.forEach(
         (node, names) -> {
-          assertEquals(perNode, names.size());
+          assertEquals("Wrong number of shards for node: " + node.getName(), perNode, names.size());
         });
 
     replicasPerShard.forEach(
@@ -1368,60 +1351,6 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
     }
   }
 
-  @Test
-  @SuppressWarnings("SelfComparison")
-  public void testCompareSpreadDomainWithNodes() {
-    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
-    final List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
-    nodeBuilders.get(0).setNodeName("nodeA");
-    nodeBuilders.get(1).setNodeName("nodeB");
-    nodeBuilders.get(2).setNodeName("nodeC");
-
-    Cluster cluster = clusterBuilder.build();
-    Node nodeA =
-        cluster.getLiveNodes().stream()
-            .filter((n) -> n.getName().equals("nodeA"))
-            .findFirst()
-            .get();
-    Node nodeB =
-        cluster.getLiveNodes().stream()
-            .filter((n) -> n.getName().equals("nodeB"))
-            .findFirst()
-            .get();
-    Node nodeC =
-        cluster.getLiveNodes().stream()
-            .filter((n) -> n.getName().equals("nodeC"))
-            .findFirst()
-            .get();
-
-    Comparator<Node> nodeComparator = Comparator.comparing(Node::getName);
-    List<Node> listInGroup1 = new ArrayList<>(List.of(nodeC, nodeA));
-    AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes group1 =
-        new AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
-            "foo", listInGroup1, 0, nodeComparator);
-    AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes group2 =
-        new AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
-            "bar", List.of(nodeB), 1, nodeComparator);
-    assertEquals("Comparing to itself should return 0", 0, group1.compareTo(group1));
-    assertEquals(
-        "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
-        1,
-        group1.compareTo(group2));
-    assertEquals(
-        "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
-        -1,
-        group2.compareTo(group1));
-    listInGroup1.remove(0);
-    assertEquals(
-        "group 1 should be greater, since 'nodeB' is greater than 'nodeA",
-        -1,
-        group1.compareTo(group2));
-    listInGroup1.remove(0);
-    listInGroup1.add(nodeB);
-    assertEquals(
-        "group 1 should be greater because of the tie breaker", -1, group1.compareTo(group2));
-  }
-
   @Test
   public void testSpreadDomainsWithDownNode() throws Exception {
     defaultConfig.spreadAcrossDomains = true;
@@ -1462,4 +1391,280 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
       assertEquals(liveNodes.get(1), rp.getNode());
     }
   }
+
+  /** Tests replica balancing across all nodes in a cluster */
+  @Test
+  public void testBalancingBareMetrics() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 2"), // shard 1
+            List.of("NRT 1", "NRT 4", "TLOG 3")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    // Add another collection
+    collectionBuilder = Builders.newCollectionBuilder("b");
+    shardsReplicas =
+        List.of(
+            List.of("NRT 1", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 0")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    BalanceRequestImpl balanceRequest =
+        new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("b 1 TLOG 0 -> 2", "b 1 NRT 3 -> 4");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+
+  /** Tests replica balancing across all nodes in a cluster */
+  @Test
+  public void testBalancingExistingMetrics() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coresOnNode = 10;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coresOnNode);
+      coresOnNode += 10;
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    BalanceRequestImpl balanceRequest =
+        new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1", "a 2 NRT 3 -> 0");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+
+  /** Tests that balancing works across a subset of nodes */
+  @Test
+  public void testBalancingWithNodeSubset() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coresOnNode = 10;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coresOnNode);
+      coresOnNode += 10;
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    // Only balance over node 1 and 2
+    List<Node> overNodes = clusterBuilder.buildLiveNodes();
+    overNodes.remove(0);
+
+    BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(overNodes));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+
+  /**
+   * Tests balancing with multiple criteria: Replica type restricted nodes, Availability zones +
+   * existing collection
+   */
+  @Test
+  public void testBalancingMultiCriteria() throws Exception {
+    String collectionName = "a";
+
+    // Note node numbering is in purpose not following AZ structure
+    final int AZ1_NRT_LOWCORES = 0;
+    final int AZ1_NRT_HIGHCORES = 3;
+    final int AZ1_TLOGPULL_LOWFREEDISK = 5;
+
+    final int AZ2_NRT_MEDCORES = 2;
+    final int AZ2_NRT_HIGHCORES = 1;
+    final int AZ2_TLOGPULL = 7;
+
+    final int AZ3_NRT_LOWCORES = 4;
+    final int AZ3_NRT_HIGHCORES = 6;
+    final int AZ3_TLOGPULL = 8;
+
+    final String AZ1 = "AZ1";
+    final String AZ2 = "AZ2";
+    final String AZ3 = "AZ3";
+
+    final int LOW_CORES = 10;
+    final int MED_CORES = 50;
+    final int HIGH_CORES = 100;
+
+    final String TLOG_PULL_REPLICA_TYPE = "TLOG, PULL";
+    final String NRT_REPLICA_TYPE = "Nrt";
+
+    // Cluster nodes and their attributes.
+    // 3 AZ's with three nodes each, 2 of which can only take NRT, one that can take TLOG or PULL
+    // One of the NRT has fewer cores than the other
+    // The TLOG/PULL replica on AZ1 doesn't have much free disk space
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    for (int i = 0; i < 9; i++) {
+      final String az;
+      final int numcores;
+      final double freedisk;
+      final String acceptedReplicaType;
+
+      if (i == AZ1_NRT_LOWCORES || i == AZ1_NRT_HIGHCORES || i == AZ1_TLOGPULL_LOWFREEDISK) {
+        az = AZ1;
+      } else if (i == AZ2_NRT_HIGHCORES || i == AZ2_NRT_MEDCORES || i == AZ2_TLOGPULL) {
+        az = AZ2;
+      } else {
+        az = AZ3;
+      }
+
+      if (i == AZ1_NRT_LOWCORES || i == AZ3_NRT_LOWCORES) {
+        numcores = LOW_CORES;
+      } else if (i == AZ2_NRT_MEDCORES) {
+        numcores = MED_CORES;
+      } else {
+        numcores = HIGH_CORES;
+      }
+
+      if (i == AZ1_TLOGPULL_LOWFREEDISK) {
+        freedisk = PRIORITIZED_FREE_DISK_GB - 10;
+      } else {
+        freedisk = PRIORITIZED_FREE_DISK_GB + 10;
+      }
+
+      if (i == AZ1_TLOGPULL_LOWFREEDISK || i == AZ2_TLOGPULL || i == AZ3_TLOGPULL) {
+        acceptedReplicaType = TLOG_PULL_REPLICA_TYPE;
+      } else {
+        acceptedReplicaType = NRT_REPLICA_TYPE;
+      }
+
+      nodeBuilders
+          .get(i)
+          .setSysprop(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP, az)
+          .setSysprop(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP, acceptedReplicaType)
+          .setCoreCount(numcores)
+          .setFreeDiskGB(freedisk);
+    }
+
+    // The collection already exists with shards and replicas.
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of(
+                "NRT " + AZ1_NRT_HIGHCORES,
+                "TLOG " + AZ3_TLOGPULL,
+                "NRT " + AZ3_NRT_HIGHCORES,
+                "NRT " + AZ3_NRT_LOWCORES,
+                "TLOG " + AZ2_TLOGPULL), // shard 1
+            List.of(
+                "TLOG " + AZ2_TLOGPULL,
+                "NRT " + AZ2_NRT_HIGHCORES,
+                "NRT " + AZ3_NRT_LOWCORES,
+                "TLOG " + AZ3_TLOGPULL)); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(liveNodes));
+    BalancePlan bp =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+    Set<String> expectedMovements =
+        Set.of(
+            "a 2 NRT " + AZ2_NRT_HIGHCORES + " -> " + AZ1_NRT_LOWCORES,
+            "a 1 NRT " + AZ3_NRT_HIGHCORES + " -> " + AZ1_NRT_LOWCORES,
+            "a 1 NRT " + AZ1_NRT_HIGHCORES + " -> " + AZ2_NRT_MEDCORES);
+    verifyBalancing(expectedMovements, bp, collectionBuilder.getShardBuilders(), liveNodes);
+  }
+
+  @Test
+  public void testWithCollectionBalancing() throws Exception {
+    AffinityPlacementConfig config =
+        new AffinityPlacementConfig(
+            MINIMAL_FREE_DISK_GB,
+            PRIORITIZED_FREE_DISK_GB,
+            Map.of(primaryCollectionName, secondaryCollectionName),
+            Map.of());
+    configurePlugin(config);
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(6);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder =
+        Builders.newCollectionBuilder(primaryCollectionName);
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas = List.of(List.of("NRT 0", "NRT 1", "NRT 3")); // shard 1
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    // Add another collection
+    collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
+    shardsReplicas = List.of(List.of("NRT 0", "NRT 2", "NRT 4")); // shard 1
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+    Cluster cluster = placementContext.getCluster();
+
+    BalanceRequestImpl balanceRequest = new BalanceRequestImpl(cluster.getLiveNodes());
+
+    BalancePlan bp = plugin.computeBalancing(balanceRequest, placementContext);
+    assertEquals(
+        "No movements expected, single movable replica requires co-placement",
+        0,
+        bp.getReplicaMovements().size());
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java
new file mode 100644
index 00000000000..6afd24fafd1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.Builders;
+import org.apache.solr.cluster.placement.PlacementContext;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.impl.BalanceRequestImpl;
+import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Unit test for {@link AffinityPlacementFactory} */
+public class MinimizeCoresPlacementFactoryTest extends AbstractPlacementFactoryTest {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private PlacementPlugin plugin;
+
+  @Before
+  public void setupPlugin() {
+    configurePlugin();
+  }
+
+  private void configurePlugin() {
+    MinimizeCoresPlacementFactory factory = new MinimizeCoresPlacementFactory();
+    plugin = factory.createPluginInstance();
+  }
+
+  @Test
+  public void testBasicPlacementNewCollection() throws Exception {
+    testBasicPlacementInternal(false);
+  }
+
+  @Test
+  public void testBasicPlacementExistingCollection() throws Exception {
+    testBasicPlacementInternal(true);
+  }
+
+  /**
+   * When this test places a replica for a new collection, it should pick the node with fewer cores.
+   *
+   * <p>
+   *
+   * <p>When it places a replica for an existing collection, it should pick the node with fewer
+   * cores that doesn't already have a replica for the shard.
+   */
+  private void testBasicPlacementInternal(boolean hasExistingCollection) throws Exception {
+    String collectionName = "basicCollection";
+
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(2);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+
+    if (hasExistingCollection) {
+      // Existing collection has replicas for its shards and is visible in the cluster state
+      collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
+      clusterBuilder.addCollection(collectionBuilder);
+    } else {
+      // New collection to create has the shards defined but no replicas and is not present in
+      // cluster state
+      collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+    }
+
+    PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+    SolrCollection solrCollection = collectionBuilder.build();
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    // Place a new replica for the (only) existing shard of the collection
+    PlacementRequestImpl placementRequest =
+        new PlacementRequestImpl(
+            solrCollection,
+            Set.of(solrCollection.shards().iterator().next().getShardName()),
+            new HashSet<>(liveNodes),
+            1,
+            0,
+            0);
+
+    PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
+
+    assertEquals(1, pp.getReplicaPlacements().size());
+    ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+    assertEquals(hasExistingCollection ? liveNodes.get(1) : liveNodes.get(0), rp.getNode());
+  }
+
+  /**
+   * Tests that existing collection replicas are taken into account when preventing more than one
+   * replica per shard to be placed on any node.
+   */
+  @Test
+  public void testPlacementWithExistingReplicas() throws Exception {
+    String collectionName = "existingCollection";
+
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coresOnNode = 10;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coresOnNode);
+      coresOnNode += 10;
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+    SolrCollection solrCollection = collectionBuilder.build();
+
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    // Place an additional NRT and an additional TLOG replica for each shard
+    PlacementRequestImpl placementRequest =
+        new PlacementRequestImpl(
+            solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), 1, 1, 0);
+
+    // The replicas must be placed on the most appropriate nodes, i.e. those that do not already
+    // have a replica for the shard and then on the node with the lowest number of cores. NRT are
+    // placed first and given the cluster state here the placement is deterministic (easier to test,
+    // only one good placement).
+    PlacementPlan pp =
+        plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "shard replica-type node"
+    Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4");
+    verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+  }
+
+  /**
+   * Tests that if a collection has replicas on nodes not currently live, placement for new replicas
+   * works ok.
+   */
+  @Test
+  public void testCollectionOnDeadNodes() throws Exception {
+    String collectionName = "walkingDead";
+
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coreCount = 0;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coreCount++);
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+    // The collection below has shard 1 having replicas only on dead nodes and shard 2 no replicas
+    // at all... (which is likely a challenging condition to recover from, but the placement
+    // computations should still execute happily).
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 10", "TLOG 11"), // shard 1
+            List.of()); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+    SolrCollection solrCollection = collectionBuilder.build();
+
+    List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+    // Place an additional PULL replica for shard 1
+    PlacementRequestImpl placementRequest =
+        new PlacementRequestImpl(
+            solrCollection,
+            Set.of(solrCollection.iterator().next().getShardName()),
+            new HashSet<>(liveNodes),
+            0,
+            0,
+            1);
+
+    PlacementPlan pp =
+        plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "shard replica-type node"
+    // Node 0 has fewer cores than node 1 (0 vs 1) so the placement should go there.
+    Set<String> expectedPlacements = Set.of("1 PULL 0");
+    verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+
+    // If we placed instead a replica for shard 2 (starting with the same initial cluster state, not
+    // including the first placement above), it should go too to node 0 since it has fewer cores...
+    Iterator<Shard> it = solrCollection.iterator();
+    it.next(); // skip first shard to do placement for the second one...
+    placementRequest =
+        new PlacementRequestImpl(
+            solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes), 0, 0, 1);
+    pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+    expectedPlacements = Set.of("2 PULL 0");
+    verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+  }
+
+  /** Tests replica balancing across all nodes in a cluster */
+  @Test
+  public void testBalancingBareMetrics() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 2"), // shard 1
+            List.of("NRT 1", "NRT 4", "TLOG 3")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    // Add another collection
+    collectionBuilder = Builders.newCollectionBuilder("b");
+    shardsReplicas =
+        List.of(
+            List.of("NRT 1", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 0")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    BalanceRequestImpl balanceRequest =
+        new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("b 1 TLOG 0 -> 2", "b 1 NRT 3 -> 4");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+
+  /** Tests replica balancing across all nodes in a cluster */
+  @Test
+  public void testBalancingExistingMetrics() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coresOnNode = 10;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coresOnNode);
+      coresOnNode += 10;
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    BalanceRequestImpl balanceRequest =
+        new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1", "a 2 NRT 3 -> 0");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+
+  /** Tests that balancing works across a subset of nodes */
+  @Test
+  public void testBalancingWithNodeSubset() throws Exception {
+    // Cluster nodes and their attributes
+    Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+    List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+    int coresOnNode = 10;
+    for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+      nodeBuilder.setCoreCount(coresOnNode);
+      coresOnNode += 10;
+    }
+
+    // The collection already exists with shards and replicas
+    Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+    // Note that the collection as defined below is in a state that would NOT be returned by the
+    // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+    // place additional replicas as long as they don't break the rules.
+    List<List<String>> shardsReplicas =
+        List.of(
+            List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+            List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+    collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+    clusterBuilder.addCollection(collectionBuilder);
+
+    // Only balance over node 1 and 2
+    List<Node> overNodes = clusterBuilder.buildLiveNodes();
+    overNodes.remove(0);
+
+    BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(overNodes));
+    BalancePlan balancePlan =
+        plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+    // Each expected placement is represented as a string "col shard replica-type fromNode ->
+    // toNode"
+    Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1");
+    verifyBalancing(
+        expectedPlacements,
+        balancePlan,
+        collectionBuilder.getShardBuilders(),
+        clusterBuilder.buildLiveNodes());
+  }
+}
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
index 6fb17351171..577eb71eb62 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
@@ -363,10 +363,75 @@ curl -X POST -H 'Content-type:application/json' --data-binary '
 
 At this point, if you run a query on a node having e.g., `rack=rack1`, Solr will try to hit only replicas from `rack1`.
 
+[[balancereplicas]]
+== Balance Replicas
+
+Shuffle the replicas across the given set of Solr nodes until an equilibrium is reached.
+
+[example.tab-pane#v2balancereplicas]
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+curl -X POST http://localhost:8983/api/cluster/replicas/balance -H 'Content-Type: application/json' -d '
+  {
+    "nodes": ["localhost:8983_solr", "localhost:8984_solr"],
+    "async": "balance-replicas-1"
+  }
+'
+----
+====
+
+===  Parameters
+
+
+`nodes`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: none
+|===
++
+The nodes over which replicas will be balanced.
+Replicas that live outside this set of nodes will not be included in the balancing.
++
+If this parameter is not provided, all live data nodes will be used.
+
+`waitForFinalState`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the request will complete only when all affected replicas become active.
+If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas.
+
+`async`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: none
+|===
++
+Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously].
+
+[IMPORTANT]
+====
+This operation does not hold necessary locks on the replicas that belong to on the source node.
+So don't perform other collection operations in this period.
+====
+
+=== BalanceReplicas Response
+
+The response will include the status of the request.
+If the status is anything other than "0", an error message will explain why the request failed.
+
 [[balanceshardunique]]
 == BALANCESHARDUNIQUE: Balance a Property Across Nodes
 
-Insures that a particular property is distributed evenly amongst the physical nodes that make up a collection.
+Ensures that a particular property is distributed evenly amongst the physical nodes that make up a collection.
 If the property already exists on a replica, every effort is made to leave it there.
 If the property is *not* on any replica on a shard, one is chosen and the property is added.
 
@@ -544,6 +609,16 @@ If this parameter is not provided, Solr will identify nodes automatically based
 If this flag is set to `true`, all replicas are created in separate threads.
 Keep in mind that this can lead to very high network and disk I/O if the replicas have very large indices.
 
+`waitForFinalState`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the request will complete only when all affected replicas become active.
+If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas.
+
 `async`::
 +
 [%autowidth,frame=none]
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index b54e29f4d6e..aae2d250c1f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -425,6 +425,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
   public interface ReplicaStateProps {
     String COLLECTION = "collection";
     String SHARD_ID = "shard";
+    String REPLICA_ID = "replica";
     String LEADER = "leader";
     String STATE = "state";
     String CORE_NAME = "core";
@@ -438,4 +439,12 @@ public class Replica extends ZkNodeProps implements MapWriter {
         Set.of(
             LEADER, STATE, CORE_NAME, CORE_NODE_NAME, TYPE, NODE_NAME, BASE_URL, FORCE_SET_STATE);
   }
+
+  public ZkNodeProps toFullProps() {
+    return new ZkNodeProps()
+        .plus(propMap)
+        .plus(ReplicaStateProps.COLLECTION, getCollection())
+        .plus(ReplicaStateProps.SHARD_ID, getShard())
+        .plus(ReplicaStateProps.REPLICA_ID, getName());
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index a1910d4edbb..516fdbfb919 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -38,6 +38,9 @@ public interface CollectionParams {
   String SOURCE_NODE = "sourceNode";
   String TARGET_NODE = "targetNode";
 
+  String NODES = "nodes";
+  String MAX_BALANCE_SKEW = "maxBalanceSkew";
+
   enum LockLevel {
     NONE(10, null),
     REPLICA(3, null),
@@ -126,6 +129,8 @@ public interface CollectionParams {
     MOCK_SHARD_TASK(false, LockLevel.SHARD),
     // TODO when we have a node level lock use it here
     REPLACENODE(true, LockLevel.NONE),
+    // TODO when we have a node level lock use it here
+    BALANCE_REPLICAS(true, LockLevel.NONE),
     DELETENODE(true, LockLevel.NONE),
     MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
     NONE(false, LockLevel.NONE),
diff --git a/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java b/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
index 776f5ff2dd7..cffbea31b4c 100644
--- a/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
+++ b/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
@@ -31,6 +31,7 @@ import org.apache.solr.cluster.Shard;
 import org.apache.solr.cluster.SolrCollection;
 import org.apache.solr.cluster.placement.impl.AttributeFetcherImpl;
 import org.apache.solr.cluster.placement.impl.AttributeValuesImpl;
+import org.apache.solr.cluster.placement.impl.BalancePlanFactoryImpl;
 import org.apache.solr.cluster.placement.impl.CollectionMetricsBuilder;
 import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
 import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl;
@@ -108,6 +109,8 @@ public class Builders {
     private static final PlacementPlanFactory PLACEMENT_PLAN_FACTORY =
         new PlacementPlanFactoryImpl();
 
+    private static final BalancePlanFactory BALANCE_PLAN_FACTORY = new BalancePlanFactoryImpl();
+
     public PlacementContext buildPlacementContext() {
       Cluster cluster = build();
       AttributeFetcher attributeFetcher = buildAttributeFetcher();
@@ -126,6 +129,11 @@ public class Builders {
         public PlacementPlanFactory getPlacementPlanFactory() {
           return PLACEMENT_PLAN_FACTORY;
         }
+
+        @Override
+        public BalancePlanFactory getBalancePlanFactory() {
+          return BALANCE_PLAN_FACTORY;
+        }
       };
     }
 
@@ -178,10 +186,12 @@ public class Builders {
       if (!collectionBuilders.isEmpty()) {
         Map<Node, Object> nodeToCoreCount =
             metrics.computeIfAbsent(NodeMetricImpl.NUM_CORES, n -> new HashMap<>());
+        Map<Node, Object> nodeToFreeDisk =
+            metrics.computeIfAbsent(NodeMetricImpl.FREE_DISK_GB, n -> new HashMap<>());
         collectionBuilders.forEach(
             builder -> {
-              collectionMetrics.put(
-                  builder.collectionName, builder.collectionMetricsBuilder.build());
+              CollectionMetrics thisCollMetrics = builder.collectionMetricsBuilder.build();
+              collectionMetrics.put(builder.collectionName, thisCollMetrics);
               SolrCollection collection = builder.build();
               collection
                   .iterator()
@@ -195,6 +205,19 @@ public class Builders {
                                         replica.getNode(),
                                         (node, count) ->
                                             (count == null) ? 1 : ((Number) count).intValue() + 1);
+                                    double leaderDiskSpace =
+                                        thisCollMetrics
+                                            .getShardMetrics(shard.getShardName())
+                                            .flatMap(
+                                                m -> m.getReplicaMetrics(replica.getReplicaName()))
+                                            .flatMap(
+                                                m ->
+                                                    m.getReplicaMetric(
+                                                        ReplicaMetricImpl.INDEX_SIZE_GB))
+                                            .orElse(0D);
+                                    nodeToFreeDisk.computeIfPresent(
+                                        replica.getNode(),
+                                        (node, freeDisk) -> ((Double) freeDisk) - leaderDiskSpace);
                                   }));
             });
       }