You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/07/06 14:56:04 UTC

[solr] branch branch_9x updated: SOLR-16855: Add a MigrateReplicas API (#1730)

This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new fd0f6a59fc5 SOLR-16855: Add a MigrateReplicas API (#1730)
fd0f6a59fc5 is described below

commit fd0f6a59fc54b1a9beb29b4e429053b78edf1735
Author: Houston Putman <ho...@apache.org>
AuthorDate: Thu Jul 6 10:52:00 2023 -0400

    SOLR-16855: Add a MigrateReplicas API (#1730)
    
    Also:
    - Fix orderedPlacement logic for tied nodes
    - Remove stashed weight logic in WeightedNode that is no longer used
    
    (cherry picked from commit b3883ad19c5413a968d5fbe6e45e605a84c3046c)
---
 solr/CHANGES.txt                                   |   3 +
 .../solr/cloud/api/collections/CollApiCmds.java    |   2 +
 .../cloud/api/collections/MigrateReplicasCmd.java  | 158 +++++++
 .../api/collections/ReplicaMigrationUtils.java     |  14 +
 .../cluster/placement/PlacementPlanFactory.java    |   1 +
 .../plugins/OrderedNodePlacementPlugin.java        | 483 +++++++++++++++------
 .../placement/plugins/RandomPlacementFactory.java  |  10 +-
 .../solr/handler/admin/CollectionsHandler.java     |   2 +
 .../solr/handler/admin/api/MigrateReplicasAPI.java | 146 +++++++
 .../org/apache/solr/cloud/MigrateReplicasTest.java | 372 ++++++++++++++++
 .../TestRequestStatusCollectionAPI.java            |   1 +
 .../handler/admin/api/MigrateReplicasAPITest.java  | 121 ++++++
 .../pages/cluster-node-management.adoc             | 113 ++++-
 .../solr/common/params/CollectionParams.java       |   4 +
 14 files changed, 1283 insertions(+), 147 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bdc23977a13..91e65a224c5 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -41,6 +41,9 @@ New Features
 
 * SOLR-16812: Support CBOR format for update/query (noble)
 
+* SOLR-16855: Solr now provides a MigrateReplicas API at `POST /api/cluster/replicas/migrate` (v2), to move replicas
+  off of a given set of nodes. This extends the functionality of the existing ReplaceNode API. (Houston Putman)
+
 Improvements
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
index e35023023b1..1595904bc4f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
@@ -51,6 +51,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.DE
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.INSTALLSHARDDATA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE_REPLICAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
@@ -143,6 +144,7 @@ public class CollApiCmds {
       commandMap =
           Map.ofEntries(
               Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)),
+              Map.entry(MIGRATE_REPLICAS, new MigrateReplicasCmd(ccc)),
               Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)),
               Map.entry(DELETENODE, new DeleteNodeCmd(ccc)),
               Map.entry(BACKUP, new BackupCmd(ccc)),
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java
new file mode 100644
index 00000000000..29616737574
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateReplicasCmd.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.util.CollectionUtil;
+import org.apache.solr.common.util.NamedList;
+
+public class MigrateReplicasCmd implements CollApiCmds.CollectionApiCommand {
+
+  private final CollectionCommandContext ccc;
+
+  public MigrateReplicasCmd(CollectionCommandContext ccc) {
+    this.ccc = ccc;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
+      throws Exception {
+    ZkStateReader zkStateReader = ccc.getZkStateReader();
+    Set<String> sourceNodes = getNodesFromParam(message, CollectionParams.SOURCE_NODES);
+    Set<String> targetNodes = getNodesFromParam(message, CollectionParams.TARGET_NODES);
+    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+    if (sourceNodes.isEmpty()) {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST, "sourceNodes is a required param");
+    }
+    String async = message.getStr(ASYNC);
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+    boolean parallel = message.getBool("parallel", false);
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    for (String sourceNode : sourceNodes) {
+      if (!clusterState.liveNodesContain(sourceNode)) {
+        throw new SolrException(
+            SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + sourceNode + " is not live");
+      }
+    }
+    for (String targetNode : targetNodes) {
+      if (!clusterState.liveNodesContain(targetNode)) {
+        throw new SolrException(
+            SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + targetNode + " is not live");
+      }
+    }
+
+    if (targetNodes.isEmpty()) {
+      // If no target nodes are provided, use all other live nodes that are not the sourceNodes
+      targetNodes =
+          clusterState.getLiveNodes().stream()
+              .filter(n -> !sourceNodes.contains(n))
+              .collect(Collectors.toSet());
+      if (targetNodes.isEmpty()) {
+        throw new SolrException(
+            SolrException.ErrorCode.BAD_REQUEST,
+            "No nodes other than the source nodes are live, therefore replicas cannot be migrated");
+      }
+    }
+    List<Replica> sourceReplicas =
+        ReplicaMigrationUtils.getReplicasOfNodes(sourceNodes, clusterState);
+    Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size());
+
+    if (targetNodes.size() > 1) {
+      List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
+      List<String> targetNodeList = new ArrayList<>(targetNodes);
+      for (Replica sourceReplica : sourceReplicas) {
+        Replica.Type replicaType = sourceReplica.getType();
+        Assign.AssignRequest assignRequest =
+            new Assign.AssignRequestBuilder()
+                .forCollection(sourceReplica.getCollection())
+                .forShard(Collections.singletonList(sourceReplica.getShard()))
+                .assignNrtReplicas(replicaType == Replica.Type.NRT ? 1 : 0)
+                .assignTlogReplicas(replicaType == Replica.Type.TLOG ? 1 : 0)
+                .assignPullReplicas(replicaType == Replica.Type.PULL ? 1 : 0)
+                .onNodes(targetNodeList)
+                .build();
+        assignRequests.add(assignRequest);
+      }
+      Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
+      List<ReplicaPosition> replicaPositions =
+          assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+      int position = 0;
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, replicaPositions.get(position++).node);
+      }
+    } else {
+      String targetNode = targetNodes.stream().findFirst().get();
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, targetNode);
+      }
+    }
+
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "MIGRATE_REPLICAS action completed successfully from  : ["
+              + String.join(",", sourceNodes)
+              + "] to : ["
+              + String.join(",", targetNodes)
+              + "]");
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  protected Set<String> getNodesFromParam(ZkNodeProps message, String paramName) {
+    Object rawParam = message.get(paramName);
+    if (rawParam == null) {
+      return Collections.emptySet();
+    } else if (rawParam instanceof Set) {
+      return (Set<String>) rawParam;
+    } else if (rawParam instanceof Collection) {
+      return new HashSet<>((Collection<String>) rawParam);
+    } else if (rawParam instanceof String) {
+      return Set.of(((String) rawParam).split(","));
+    } else {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "'"
+              + paramName
+              + "' was not passed as a correct type (Set/List/String): "
+              + rawParam.getClass().getName());
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
index 903d509da18..6013dea0c2d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
@@ -304,6 +304,20 @@ public class ReplicaMigrationUtils {
     return cleanupLatch.await(5, TimeUnit.MINUTES);
   }
 
+  static List<Replica> getReplicasOfNodes(Collection<String> nodeNames, ClusterState state) {
+    List<Replica> sourceReplicas = new ArrayList<>();
+    for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
+      for (Slice slice : e.getValue().getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          if (nodeNames.contains(replica.getNodeName())) {
+            sourceReplicas.add(replica);
+          }
+        }
+      }
+    }
+    return sourceReplicas;
+  }
+
   static List<Replica> getReplicasOfNode(String nodeName, ClusterState state) {
     List<Replica> sourceReplicas = new ArrayList<>();
     for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java
index 1a539c09b0a..8ed2abe9a9d 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java
@@ -39,6 +39,7 @@ public interface PlacementPlanFactory {
    * org.apache.solr.cloud.api.collections.CreateShardCmd}, {@link
    * org.apache.solr.cloud.api.collections.ReplaceNodeCmd}, {@link
    * org.apache.solr.cloud.api.collections.MoveReplicaCmd}, {@link
+   * org.apache.solr.cloud.api.collections.MigrateReplicasCmd}, {@link
    * org.apache.solr.cloud.api.collections.SplitShardCmd}, {@link
    * org.apache.solr.cloud.api.collections.RestoreCmd}, {@link
    * org.apache.solr.cloud.api.collections.MigrateCmd} as well as of {@link
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
index 0a3beff2706..de83db8e967 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
@@ -18,10 +18,12 @@
 package org.apache.solr.cluster.placement.plugins;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,11 +31,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.IntSupplier;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.solr.cluster.Node;
 import org.apache.solr.cluster.Replica;
@@ -66,34 +68,35 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
     List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
     Set<Node> allNodes = new HashSet<>();
     Set<SolrCollection> allCollections = new HashSet<>();
+
+    Deque<PendingPlacementRequest> pendingRequests = new ArrayDeque<>(requests.size());
     for (PlacementRequest request : requests) {
+      PendingPlacementRequest pending = new PendingPlacementRequest(request);
+      pendingRequests.add(pending);
+      placementPlans.add(
+          placementContext
+              .getPlacementPlanFactory()
+              .createPlacementPlan(request, pending.getComputedPlacementSet()));
       allNodes.addAll(request.getTargetNodes());
       allCollections.add(request.getCollection());
     }
+
     Collection<WeightedNode> weightedNodes =
         getWeightedNodes(placementContext, allNodes, allCollections, true).values();
-    for (PlacementRequest request : requests) {
-      int totalReplicasPerShard = 0;
-      for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-        totalReplicasPerShard += request.getCountReplicasToCreate(rt);
+    while (!pendingRequests.isEmpty()) {
+      PendingPlacementRequest request = pendingRequests.poll();
+      if (!request.isPending()) {
+        continue;
       }
 
       List<WeightedNode> nodesForRequest =
-          weightedNodes.stream()
-              .filter(wn -> request.getTargetNodes().contains(wn.getNode()))
-              .collect(Collectors.toList());
-
-      Set<ReplicaPlacement> replicaPlacements =
-          CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
+          weightedNodes.stream().filter(request::isTargetingNode).collect(Collectors.toList());
 
       SolrCollection solrCollection = request.getCollection();
-      // Now place randomly all replicas of all shards on available nodes
-      for (String shardName : request.getShardNames()) {
-        for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-          int replicaCount = request.getCountReplicasToCreate(replicaType);
-          if (replicaCount == 0) {
-            continue;
-          }
+      // Now place all replicas of all shards on available nodes
+      for (String shardName : request.getPendingShards()) {
+        for (Replica.ReplicaType replicaType : request.getPendingReplicaTypes(shardName)) {
+          int replicaCount = request.getPendingReplicas(shardName, replicaType);
           if (log.isDebugEnabled()) {
             log.debug(
                 "Placing {} replicas for Collection: {}, Shard: {}, ReplicaType: {}",
@@ -103,46 +106,52 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
                 replicaType);
           }
           Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null);
-          PriorityQueue<WeightedNode> nodesForReplicaType = new PriorityQueue<>();
+
+          // Create a NodeHeap so that we have access to the number of ties for the lowestWeighted
+          // node.
+          // Sort this heap by the relevant weight of the node given that the replica has been
+          // added.
+          NodeHeap nodesForReplicaType = new NodeHeap(n -> n.calcRelevantWeightWithReplica(pr));
           nodesForRequest.stream()
               .filter(n -> n.canAddReplica(pr))
-              .forEach(
-                  n -> {
-                    n.sortByRelevantWeightWithReplica(pr);
-                    n.addToSortedCollection(nodesForReplicaType);
-                  });
+              .forEach(nodesForReplicaType::add);
 
           int replicasPlaced = 0;
+          boolean retryRequestLater = false;
           while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) {
             WeightedNode node = nodesForReplicaType.poll();
+
             if (!node.canAddReplica(pr)) {
-              if (log.isDebugEnabled()) {
-                log.debug(
-                    "Node can no longer accept replica, removing from selection list: {}",
-                    node.getNode());
-              }
+              log.debug("Node can no-longer add the given replica, move on to next node: {}", node);
               continue;
             }
-            if (node.hasWeightChangedSinceSort()) {
-              if (log.isDebugEnabled()) {
-                log.debug(
-                    "Node's sort is out-of-date, adding back to selection list: {}",
-                    node.getNode());
-              }
-              node.addToSortedCollection(nodesForReplicaType);
-              // The node will be re-sorted,
-              // so go back to the top of the loop to get the new lowest-sorted node
-              continue;
-            }
-            if (log.isDebugEnabled()) {
-              log.debug("Node chosen to host replica: {}", node.getNode());
+
+            // If there is a tie, and there are more node options than we have replicas to place,
+            // then we want to come back later and try again. If there are ties, but less tie
+            // options than we have replicas to place, that's ok, because the replicas will likely
+            // be put on all the tie options.
+            //
+            // Only skip the request if it can be requeued, and there are other pending requests to
+            // compute.
+            int numWeightTies = nodesForReplicaType.peekTies();
+            if (!pendingRequests.isEmpty()
+                && request.canBeRequeued()
+                && numWeightTies > (replicaCount - replicasPlaced)) {
+              log.debug(
+                  "There is a tie for best weight. There are more options ({}) than replicas to place ({}), so try this placement request later: {}",
+                  numWeightTies,
+                  replicaCount - replicasPlaced,
+                  node);
+              retryRequestLater = true;
+              break;
             }
+            log.debug("Node chosen to host replica: {}", node);
 
             boolean needsToResortAll =
                 node.addReplica(
                     createProjectedReplica(solrCollection, shardName, replicaType, node.getNode()));
             replicasPlaced += 1;
-            replicaPlacements.add(
+            request.addPlacement(
                 placementContext
                     .getPlacementPlanFactory()
                     .createReplicaPlacement(
@@ -150,12 +159,8 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
             // Only update the priorityQueue if there are still replicas to be placed
             if (replicasPlaced < replicaCount) {
               if (needsToResortAll) {
-                if (log.isDebugEnabled()) {
-                  log.debug("Replica addition requires re-sorting of entire selection list");
-                }
-                List<WeightedNode> nodeList = new ArrayList<>(nodesForReplicaType);
-                nodesForReplicaType.clear();
-                nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType));
+                log.debug("Replica addition requires re-sorting of entire selection list");
+                nodesForReplicaType.resortAll();
               }
               // Add the chosen node back to the list if it can accept another replica of the
               // shard/replicaType.
@@ -167,7 +172,7 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
             }
           }
 
-          if (replicasPlaced < replicaCount) {
+          if (!retryRequestLater && replicasPlaced < replicaCount) {
             throw new PlacementException(
                 String.format(
                     Locale.ROOT,
@@ -180,11 +185,10 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
           }
         }
       }
-
-      placementPlans.add(
-          placementContext
-              .getPlacementPlanFactory()
-              .createPlacementPlan(request, replicaPlacements));
+      if (request.isPending()) {
+        request.requeue();
+        pendingRequests.add(request);
+      }
     }
     return placementPlans;
   }
@@ -194,23 +198,17 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
       BalanceRequest balanceRequest, PlacementContext placementContext) throws PlacementException {
     Map<Replica, Node> replicaMovements = new HashMap<>();
     TreeSet<WeightedNode> orderedNodes = new TreeSet<>();
-    Collection<WeightedNode> weightedNodes =
+    orderedNodes.addAll(
         getWeightedNodes(
                 placementContext,
                 balanceRequest.getNodes(),
                 placementContext.getCluster().collections(),
                 true)
-            .values();
-    // This is critical to store the last sort weight for this node
-    weightedNodes.forEach(
-        node -> {
-          node.sortWithoutChanges();
-          node.addToSortedCollection(orderedNodes);
-        });
+            .values());
 
     // While the node with the lowest weight still has room to take a replica from the node with the
     // highest weight, loop
-    Map<Replica, Node> newReplicaMovements = new HashMap<>();
+    Map<Replica, Node> newReplicaMovements = CollectionUtil.newHashMap(1);
     ArrayList<WeightedNode> traversedHighNodes = new ArrayList<>(orderedNodes.size() - 1);
     while (orderedNodes.size() > 1
         && orderedNodes.first().calcWeight() < orderedNodes.last().calcWeight()) {
@@ -218,22 +216,7 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
       if (lowestWeight == null) {
         break;
       }
-      if (lowestWeight.hasWeightChangedSinceSort()) {
-        if (log.isDebugEnabled()) {
-          log.debug(
-              "Re-sorting lowest weighted node: {}, sorting weight is out-of-date.",
-              lowestWeight.getNode().getName());
-        }
-        // Re-sort this node and go back to find the lowest weight
-        lowestWeight.addToSortedCollection(orderedNodes);
-        continue;
-      }
-      if (log.isDebugEnabled()) {
-        log.debug(
-            "Lowest weighted node: {}, weight: {}",
-            lowestWeight.getNode().getName(),
-            lowestWeight.calcWeight());
-      }
+      log.debug("Highest weighted node: {}", lowestWeight);
 
       newReplicaMovements.clear();
       // If a compatible node was found to move replicas, break and find the lowest weighted node
@@ -245,22 +228,7 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
         if (highestWeight == null) {
           break;
         }
-        if (highestWeight.hasWeightChangedSinceSort()) {
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "Re-sorting highest weighted node: {}, sorting weight is out-of-date.",
-                highestWeight.getNode().getName());
-          }
-          // Re-sort this node and go back to find the highest weight
-          highestWeight.addToSortedCollection(orderedNodes);
-          continue;
-        }
-        if (log.isDebugEnabled()) {
-          log.debug(
-              "Highest weighted node: {}, weight: {}",
-              highestWeight.getNode().getName(),
-              highestWeight.calcWeight());
-        }
+        log.debug("Highest weighted node: {}", highestWeight);
 
         traversedHighNodes.add(highestWeight);
         // select a replica from the node with the most cores to move to the node with the least
@@ -298,13 +266,11 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
             highestWeight.addReplica(r);
             continue;
           }
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "Replica Movement chosen. From: {}, To: {}, Replica: {}",
-                highestWeight.getNode().getName(),
-                lowestWeight.getNode().getName(),
-                r);
-          }
+          log.debug(
+              "Replica Movement chosen. From: {}, To: {}, Replica: {}",
+              highestWeight,
+              lowestWeight,
+              r);
           newReplicaMovements.put(r, lowestWeight.getNode());
 
           // Do not go beyond here, do another loop and see if other nodes can move replicas.
@@ -321,12 +287,12 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
 
       // Add back in the traversed highNodes that we did not select replicas from,
       // they might have replicas to move to the next lowestWeighted node
-      traversedHighNodes.forEach(n -> n.addToSortedCollection(orderedNodes));
+      orderedNodes.addAll(traversedHighNodes);
       traversedHighNodes.clear();
       if (newReplicaMovements.size() > 0) {
         replicaMovements.putAll(newReplicaMovements);
         // There are no replicas to move to the lowestWeight, remove it from our loop
-        lowestWeight.addToSortedCollection(orderedNodes);
+        orderedNodes.add(lowestWeight);
       }
     }
 
@@ -437,22 +403,10 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
   public abstract static class WeightedNode implements Comparable<WeightedNode> {
     private final Node node;
     private final Map<String, Map<String, Set<Replica>>> replicas;
-    private IntSupplier sortWeightCalculator;
-    private int lastSortedWeight;
 
     public WeightedNode(Node node) {
       this.node = node;
       this.replicas = new HashMap<>();
-      this.lastSortedWeight = 0;
-      this.sortWeightCalculator = this::calcWeight;
-    }
-
-    public void sortByRelevantWeightWithReplica(Replica replica) {
-      sortWeightCalculator = () -> calcRelevantWeightWithReplica(replica);
-    }
-
-    public void sortWithoutChanges() {
-      sortWeightCalculator = this::calcWeight;
     }
 
     public Node getNode() {
@@ -490,11 +444,6 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
           .orElseGet(Collections::emptySet);
     }
 
-    public void addToSortedCollection(Collection<WeightedNode> collection) {
-      stashSortedWeight();
-      collection.add(this);
-    }
-
     public abstract int calcWeight();
 
     public abstract int calcRelevantWeightWithReplica(Replica replica);
@@ -571,14 +520,6 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
 
     protected abstract void removeProjectedReplicaWeights(Replica replica);
 
-    private void stashSortedWeight() {
-      lastSortedWeight = sortWeightCalculator.getAsInt();
-    }
-
-    protected boolean hasWeightChangedSinceSort() {
-      return lastSortedWeight != sortWeightCalculator.getAsInt();
-    }
-
     @SuppressWarnings({"rawtypes"})
     protected Comparable getTiebreaker() {
       return node.getName();
@@ -587,7 +528,7 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
     @Override
     @SuppressWarnings({"unchecked"})
     public int compareTo(WeightedNode o) {
-      int comp = Integer.compare(this.lastSortedWeight, o.lastSortedWeight);
+      int comp = Integer.compare(this.calcWeight(), o.calcWeight());
       if (comp == 0 && !equals(o)) {
         // TreeSets do not like a 0 comp for non-equal members.
         comp = getTiebreaker().compareTo(o.getTiebreaker());
@@ -616,7 +557,7 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
 
     @Override
     public String toString() {
-      return "WeightedNode{" + "node=" + node + ", lastSortedWeight=" + lastSortedWeight + '}';
+      return "WeightedNode{" + "node=" + node.getName() + ", weight=" + calcWeight() + '}';
     }
   }
 
@@ -722,4 +663,280 @@ public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
       }
     };
   }
+
+  /**
+   * A heap that stores Nodes, sorting them by a given function.
+   *
+   * <p>A normal Java heap class cannot be used, because the {@link #peekTies()} method is required.
+   */
+  private static class NodeHeap {
+    final Function<WeightedNode, Integer> weightFunc;
+
+    final TreeMap<Integer, Deque<WeightedNode>> nodesByWeight;
+
+    Deque<WeightedNode> currentLowestList;
+    int currentLowestWeight;
+
+    int size = 0;
+
+    protected NodeHeap(Function<WeightedNode, Integer> weightFunc) {
+      this.weightFunc = weightFunc;
+      nodesByWeight = new TreeMap<>();
+      currentLowestList = null;
+      currentLowestWeight = -1;
+    }
+
+    /**
+     * Remove and return the node with the lowest weight. There is no guarantee to the sorting of
+     * nodes that have equal weights.
+     *
+     * @return the node with the lowest weight
+     */
+    protected WeightedNode poll() {
+      updateLowestWeightedList();
+      if (currentLowestList == null || currentLowestList.isEmpty()) {
+        return null;
+      } else {
+        size--;
+        return currentLowestList.pollFirst();
+      }
+    }
+
+    /**
+     * Return the number of Nodes that are tied for the current lowest weight (using the given
+     * sorting function).
+     *
+     * <p>PeekTies should only be called after poll().
+     *
+     * @return the number of nodes that are tied for the lowest weight
+     */
+    protected int peekTies() {
+      return currentLowestList == null ? 1 : currentLowestList.size() + 1;
+    }
+
+    /** Make sure that the list that contains the nodes with the lowest weights is correct. */
+    private void updateLowestWeightedList() {
+      recheckLowestWeights();
+      while (currentLowestList == null || currentLowestList.isEmpty()) {
+        Map.Entry<Integer, Deque<WeightedNode>> lowestEntry = nodesByWeight.pollFirstEntry();
+        if (lowestEntry == null) {
+          currentLowestList = null;
+          currentLowestWeight = -1;
+          break;
+        } else {
+          currentLowestList = lowestEntry.getValue();
+          currentLowestWeight = lowestEntry.getKey();
+          recheckLowestWeights();
+        }
+      }
+    }
+
+    /**
+     * Go through the list of Nodes with the lowest weight, and make sure that they are still the
+     * same weight. If their weight has increased, re-add the node to the heap.
+     */
+    private void recheckLowestWeights() {
+      if (currentLowestList != null) {
+        currentLowestList.removeIf(
+            node -> {
+              if (weightFunc.apply(node) != currentLowestWeight) {
+                log.debug("Node's sort is out-of-date, re-sorting: {}", node);
+                add(node);
+                return true;
+              }
+              return false;
+            });
+      }
+    }
+
+    /**
+     * Add a node to the heap.
+     *
+     * @param node the node to add
+     */
+    public void add(WeightedNode node) {
+      size++;
+      int nodeWeight = weightFunc.apply(node);
+      if (currentLowestWeight == nodeWeight) {
+        currentLowestList.add(node);
+      } else {
+        nodesByWeight.computeIfAbsent(nodeWeight, w -> new ArrayDeque<>()).addLast(node);
+      }
+    }
+
+    /**
+     * Get the number of nodes in the heap.
+     *
+     * @return number of nodes
+     */
+    public int size() {
+      return size;
+    }
+
+    /**
+     * Check if the heap is empty.
+     *
+     * @return if the heap has no nodes
+     */
+    public boolean isEmpty() {
+      return size == 0;
+    }
+
+    /**
+     * Re-sort all nodes in the heap, because their weights can no-longer be trusted. This is only
+     * necessary if nodes in the heap may have had their weights decrease. If the nodes just had
+     * their weights increase, then calling this is not required.
+     */
+    public void resortAll() {
+      ArrayList<WeightedNode> temp = new ArrayList<>(size);
+      if (currentLowestList != null) {
+        temp.addAll(currentLowestList);
+        currentLowestList.clear();
+      }
+      nodesByWeight.values().forEach(temp::addAll);
+      currentLowestWeight = -1;
+      nodesByWeight.clear();
+      temp.forEach(this::add);
+    }
+  }
+
+  /** Context for a placement request still has replicas that need to be placed. */
+  static class PendingPlacementRequest {
+    boolean hasBeenRequeued;
+
+    final SolrCollection collection;
+
+    final Set<Node> targetNodes;
+
+    // A running list of placements already computed
+    final Set<ReplicaPlacement> computedPlacements;
+
+    // A live view on how many replicas still need to be placed for each shard & replica type
+    final Map<String, Map<Replica.ReplicaType, Integer>> replicasToPlaceForShards;
+
+    public PendingPlacementRequest(PlacementRequest request) {
+      hasBeenRequeued = false;
+      collection = request.getCollection();
+      targetNodes = request.getTargetNodes();
+      Set<String> shards = request.getShardNames();
+      replicasToPlaceForShards = CollectionUtil.newHashMap(shards.size());
+      int totalShardReplicas = 0;
+      for (Replica.ReplicaType type : Replica.ReplicaType.values()) {
+        int count = request.getCountReplicasToCreate(type);
+        if (count > 0) {
+          totalShardReplicas += count;
+          shards.forEach(
+              s ->
+                  replicasToPlaceForShards
+                      .computeIfAbsent(s, sh -> CollectionUtil.newHashMap(3))
+                      .put(type, count));
+        }
+      }
+      computedPlacements = CollectionUtil.newHashSet(totalShardReplicas * shards.size());
+    }
+
+    /**
+     * Determine if this request is not yet complete, and there are requested replicas that have not
+     * had placements computed.
+     *
+     * @return if there are still replica placements that need to be computed
+     */
+    public boolean isPending() {
+      return !replicasToPlaceForShards.isEmpty();
+    }
+
+    public SolrCollection getCollection() {
+      return collection;
+    }
+
+    public boolean isTargetingNode(WeightedNode node) {
+      return targetNodes.contains(node.getNode());
+    }
+
+    /**
+     * The set of ReplicaPlacements computed for this request.
+     *
+     * <p>The list that is returned is the same list used internally, so it will be augmented until
+     * {@link #isPending()} returns false.
+     *
+     * @return The live set of replicaPlacements for this request.
+     */
+    public Set<ReplicaPlacement> getComputedPlacementSet() {
+      return computedPlacements;
+    }
+
+    /**
+     * Fetch the list of shards that still have replicas that need placements computed. If all the
+     * requested replicas for a shard are represented in {@link #getComputedPlacementSet()}, then
+     * that shard will not be returned by this method.
+     *
+     * @return list of unfinished shards
+     */
+    public Collection<String> getPendingShards() {
+      return new ArrayList<>(replicasToPlaceForShards.keySet());
+    }
+
+    /**
+     * For the given shard, return the replica types that still have placements that need to be
+     * computed.
+     *
+     * @param shard name of the shard to check for uncomputed placements
+     * @return the set of unfinished replica types
+     */
+    public Collection<Replica.ReplicaType> getPendingReplicaTypes(String shard) {
+      return Optional.ofNullable(replicasToPlaceForShards.get(shard))
+          .map(Map::keySet)
+          // Use a sorted TreeSet to make sure that tests are repeatable
+          .<Collection<Replica.ReplicaType>>map(TreeSet::new)
+          .orElseGet(Collections::emptyList);
+    }
+
+    /**
+     * Fetch the number of replicas that still need to be placed for the given shard and replica
+     * type.
+     *
+     * @param shard name of shard to be place
+     * @param type type of replica to be placed
+     * @return the number of replicas that have not yet had placements computed
+     */
+    public int getPendingReplicas(String shard, Replica.ReplicaType type) {
+      return Optional.ofNullable(replicasToPlaceForShards.get(shard))
+          .map(m -> m.get(type))
+          .orElse(0);
+    }
+
+    /**
+     * Currently, only of requeue is allowed per pending request.
+     *
+     * @return true if the request has not been requeued already
+     */
+    public boolean canBeRequeued() {
+      return !hasBeenRequeued;
+    }
+
+    /** Let the pending request know that it has been requeued */
+    public void requeue() {
+      hasBeenRequeued = true;
+    }
+
+    /**
+     * Track the given replica placement for this pending request.
+     *
+     * @param replica placement that has been made for the pending request
+     */
+    public void addPlacement(ReplicaPlacement replica) {
+      computedPlacements.add(replica);
+      replicasToPlaceForShards.computeIfPresent(
+          replica.getShardName(),
+          (shard, replicaTypes) -> {
+            replicaTypes.computeIfPresent(
+                replica.getReplicaType(), (type, count) -> (count == 1) ? null : count - 1);
+            if (replicaTypes.size() > 0) {
+              return replicaTypes;
+            } else {
+              return null;
+            }
+          });
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
index 1e0f6a2f5ba..0b2279b34fa 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.solr.cluster.placement.plugins;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -108,20 +107,15 @@ public class RandomPlacementFactory
 
     @Override
     protected boolean addProjectedReplicaWeights(Replica replica) {
+      randomTiebreaker = random.nextInt();
       // NO-OP
       return false;
     }
 
     @Override
     protected void removeProjectedReplicaWeights(Replica replica) {
-      // NO-OP
-    }
-
-    @Override
-    public void addToSortedCollection(
-        Collection<OrderedNodePlacementPlugin.WeightedNode> collection) {
       randomTiebreaker = random.nextInt();
-      super.addToSortedCollection(collection);
+      // NO-OP
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 9413389ca92..23a0d094ed6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -188,6 +188,7 @@ import org.apache.solr.handler.admin.api.ListCollectionBackupsAPI;
 import org.apache.solr.handler.admin.api.ListCollectionSnapshotsAPI;
 import org.apache.solr.handler.admin.api.ListCollectionsAPI;
 import org.apache.solr.handler.admin.api.MigrateDocsAPI;
+import org.apache.solr.handler.admin.api.MigrateReplicasAPI;
 import org.apache.solr.handler.admin.api.ModifyCollectionAPI;
 import org.apache.solr.handler.admin.api.MoveReplicaAPI;
 import org.apache.solr.handler.admin.api.RebalanceLeadersAPI;
@@ -1382,6 +1383,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         ReloadCollectionAPI.class,
         RenameCollectionAPI.class,
         ReplaceNodeAPI.class,
+        MigrateReplicasAPI.class,
         BalanceReplicasAPI.class,
         RestoreCollectionAPI.class,
         SyncShardAPI.class,
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java
new file mode 100644
index 00000000000..da1188ef89d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/MigrateReplicasAPI.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.admin.api;
+
+import static org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.params.CollectionParams.SOURCE_NODES;
+import static org.apache.solr.common.params.CollectionParams.TARGET_NODES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
+import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.parameters.RequestBody;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.jersey.JacksonReflectMapWriter;
+import org.apache.solr.jersey.PermissionName;
+import org.apache.solr.jersey.SolrJerseyResponse;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+/** V2 API for migrating replicas from a set of nodes to another set of nodes. */
+@Path("cluster/replicas/migrate")
+public class MigrateReplicasAPI extends AdminAPIBase {
+
+  @Inject
+  public MigrateReplicasAPI(
+      CoreContainer coreContainer,
+      SolrQueryRequest solrQueryRequest,
+      SolrQueryResponse solrQueryResponse) {
+    super(coreContainer, solrQueryRequest, solrQueryResponse);
+  }
+
+  @POST
+  @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2})
+  @PermissionName(COLL_EDIT_PERM)
+  @Operation(summary = "Migrate Replicas from a given set of nodes.")
+  public SolrJerseyResponse migrateReplicas(
+      @RequestBody(description = "Contains user provided parameters", required = true)
+          MigrateReplicasRequestBody requestBody)
+      throws Exception {
+    final SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class);
+    final CoreContainer coreContainer = fetchAndValidateZooKeeperAwareCoreContainer();
+    // TODO Record node for log and tracing
+    final ZkNodeProps remoteMessage = createRemoteMessage(requestBody);
+    final SolrResponse remoteResponse =
+        CollectionsHandler.submitCollectionApiCommand(
+            coreContainer,
+            coreContainer.getDistributedCollectionCommandRunner(),
+            remoteMessage,
+            CollectionAction.MIGRATE_REPLICAS,
+            DEFAULT_COLLECTION_OP_TIMEOUT);
+    if (remoteResponse.getException() != null) {
+      throw remoteResponse.getException();
+    }
+
+    disableResponseCaching();
+    return response;
+  }
+
+  public ZkNodeProps createRemoteMessage(MigrateReplicasRequestBody requestBody) {
+    final Map<String, Object> remoteMessage = new HashMap<>();
+    if (requestBody != null) {
+      if (requestBody.sourceNodes == null || requestBody.sourceNodes.isEmpty()) {
+        throw new SolrException(
+            SolrException.ErrorCode.BAD_REQUEST,
+            "No 'sourceNodes' provided in the request body. The MigrateReplicas API requires a 'sourceNodes' list in the request body.");
+      }
+      insertIfNotNull(remoteMessage, SOURCE_NODES, requestBody.sourceNodes);
+      insertIfNotNull(remoteMessage, TARGET_NODES, requestBody.targetNodes);
+      insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
+      insertIfNotNull(remoteMessage, ASYNC, requestBody.async);
+    } else {
+      throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "No request body sent with request. The MigrateReplicas API requires a body.");
+    }
+    remoteMessage.put(QUEUE_OPERATION, CollectionAction.MIGRATE_REPLICAS.toLower());
+
+    return new ZkNodeProps(remoteMessage);
+  }
+
+  public static class MigrateReplicasRequestBody implements JacksonReflectMapWriter {
+
+    public MigrateReplicasRequestBody() {}
+
+    public MigrateReplicasRequestBody(
+        Set<String> sourceNodes, Set<String> targetNodes, Boolean waitForFinalState, String async) {
+      this.sourceNodes = sourceNodes;
+      this.targetNodes = targetNodes;
+      this.waitForFinalState = waitForFinalState;
+      this.async = async;
+    }
+
+    @Schema(description = "The set of nodes which all replicas will be migrated off of.")
+    @JsonProperty(value = "sourceNodes", required = true)
+    public Set<String> sourceNodes;
+
+    @Schema(
+        description =
+            "A set of nodes to migrate the replicas to. If this is not provided, then the API will use the live data nodes not in 'sourceNodes'.")
+    @JsonProperty(value = "targetNodes")
+    public Set<String> targetNodes;
+
+    @Schema(
+        description =
+            "If true, the request will complete only when all affected replicas become active. "
+                + "If false, the API will return the status of the single action, which may be "
+                + "before the new replicas are online and active.")
+    @JsonProperty("waitForFinalState")
+    public Boolean waitForFinalState = false;
+
+    @Schema(description = "Request ID to track this action which will be processed asynchronously.")
+    @JsonProperty("async")
+    public String async;
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java
new file mode 100644
index 00000000000..0935770b9f5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MigrateReplicasTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.codahale.metrics.Metric;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.apache.solr.handler.admin.api.MigrateReplicasAPI;
+import org.apache.solr.metrics.MetricsMap;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrateReplicasTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @BeforeClass
+  public static void setupCluster() {
+    System.setProperty("metricsEnabled", "true");
+  }
+
+  @Before
+  public void clearPreviousCluster() throws Exception {
+    // Clear the previous cluster before each test, since they use different numbers of nodes.
+    shutdownCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    configureCluster(6)
+        .addConfig(
+            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "replacenodetest_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    String emptyNode = l.remove(0);
+    String nodeToBeDecommissioned = l.get(0);
+    CollectionAdminRequest.Create create;
+    // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we
+    // never have to worry about null checking when comparing the Create command with the final
+    // Slices
+
+    // TODO: tlog replicas do not work correctly in tests due to fault
+    // TestInjection#waitForInSyncWithLeader
+    create =
+        pickRandom(
+            CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 0, 0),
+            // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
+            // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
+            // CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
+            // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
+            // check also replicationFactor 1
+            CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0)
+            // CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
+            );
+    create.setCreateNodeSet(StrUtils.join(l, ','));
+    cloudClient.request(create);
+
+    cluster.waitForActiveCollection(
+        coll,
+        5,
+        5
+            * (create.getNumNrtReplicas()
+                + create.getNumPullReplicas()
+                + create.getNumTlogReplicas()));
+
+    DocCollection collection = cloudClient.getClusterState().getCollection(coll);
+    log.debug("### Before decommission: {}", collection);
+    log.info("excluded_node : {}  ", emptyNode);
+    Map<?, ?> response =
+        callMigrateReplicas(
+            cloudClient,
+            new MigrateReplicasAPI.MigrateReplicasRequestBody(
+                Set.of(nodeToBeDecommissioned), Set.of(emptyNode), true, null));
+    assertEquals(
+        "MigrateReplicas request was unsuccessful",
+        0L,
+        ((Map<?, ?>) response.get("responseHeader")).get("status"));
+    ZkStateReader zkStateReader = ZkStateReader.from(cloudClient);
+    try (SolrClient coreClient =
+        getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(nodeToBeDecommissioned))) {
+      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient);
+      assertEquals(
+          "There should not be any cores left on decommissioned node",
+          0,
+          status.getCoreStatus().size());
+    }
+
+    Thread.sleep(5000);
+    collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+    log.debug("### After decommission: {}", collection);
+    // check what are replica states on the decommissioned node
+    assertNull(
+        "There should not be any replicas left on decommissioned node",
+        collection.getReplicas(nodeToBeDecommissioned));
+
+    // let's do it back - this time wait for recoveries
+    response =
+        callMigrateReplicas(
+            cloudClient,
+            new MigrateReplicasAPI.MigrateReplicasRequestBody(
+                Set.of(emptyNode), Set.of(nodeToBeDecommissioned), true, null));
+    assertEquals(
+        "MigrateReplicas request was unsuccessful",
+        0L,
+        ((Map<?, ?>) response.get("responseHeader")).get("status"));
+
+    try (SolrClient coreClient =
+        getHttpSolrClient(zkStateReader.getBaseUrlForNodeName(emptyNode))) {
+      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreClient);
+      assertEquals(
+          "Expecting no cores but found some: " + status.getCoreStatus(),
+          0,
+          status.getCoreStatus().size());
+    }
+
+    collection = cluster.getSolrClient().getClusterState().getCollection(coll);
+    assertEquals(create.getNumShards().intValue(), collection.getSlices().size());
+    for (Slice s : collection.getSlices()) {
+      assertEquals(
+          create.getNumNrtReplicas().intValue(),
+          s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+      assertEquals(
+          create.getNumTlogReplicas().intValue(),
+          s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+      assertEquals(
+          create.getNumPullReplicas().intValue(),
+          s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+    }
+    // make sure all newly created replicas on node are active
+    List<Replica> newReplicas = collection.getReplicas(nodeToBeDecommissioned);
+    assertNotNull("There should be replicas on the migrated-to node", newReplicas);
+    assertFalse("There should be replicas on the migrated-to node", newReplicas.isEmpty());
+    for (Replica r : newReplicas) {
+      assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
+    }
+    // make sure all replicas on emptyNode are not active
+    List<Replica> replicas = collection.getReplicas(emptyNode);
+    if (replicas != null) {
+      for (Replica r : replicas) {
+        assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState());
+      }
+    }
+
+    // check replication metrics on this jetty - see SOLR-14924
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      if (jetty.getCoreContainer() == null) {
+        continue;
+      }
+      SolrMetricManager metricManager = jetty.getCoreContainer().getMetricManager();
+      String registryName = null;
+      for (String name : metricManager.registryNames()) {
+        if (name.startsWith("solr.core.")) {
+          registryName = name;
+        }
+      }
+      Map<String, Metric> metrics = metricManager.registry(registryName).getMetrics();
+      if (!metrics.containsKey("REPLICATION./replication.fetcher")) {
+        continue;
+      }
+      MetricsMap fetcherGauge =
+          (MetricsMap)
+              ((SolrMetricManager.GaugeWrapper<?>) metrics.get("REPLICATION./replication.fetcher"))
+                  .getGauge();
+      assertNotNull("no IndexFetcher gauge in metrics", fetcherGauge);
+      Map<String, Object> value = fetcherGauge.getValue();
+      if (value.isEmpty()) {
+        continue;
+      }
+      assertNotNull("isReplicating missing: " + value, value.get("isReplicating"));
+      assertTrue(
+          "isReplicating should be a boolean: " + value,
+          value.get("isReplicating") instanceof Boolean);
+      if (value.get("indexReplicatedAt") == null) {
+        continue;
+      }
+      assertNotNull("timesIndexReplicated missing: " + value, value.get("timesIndexReplicated"));
+      assertTrue(
+          "timesIndexReplicated should be a number: " + value,
+          value.get("timesIndexReplicated") instanceof Number);
+    }
+  }
+
+  @Test
+  public void testGoodSpreadDuringAssignWithNoTarget() throws Exception {
+    configureCluster(5)
+        .addConfig(
+            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "migratereplicastest_notarget_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+    List<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    List<String> nodesToBeDecommissioned = l.subList(0, 2);
+    List<String> eventualTargetNodes = l.subList(2, l.size());
+
+    // TODO: tlog replicas do not work correctly in tests due to fault
+    // TestInjection#waitForInSyncWithLeader
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0);
+    cloudClient.request(create);
+
+    cluster.waitForActiveCollection(
+        coll,
+        create.getNumShards(),
+        create.getNumShards()
+            * (create.getNumNrtReplicas()
+                + create.getNumPullReplicas()
+                + create.getNumTlogReplicas()));
+
+    DocCollection initialCollection = cloudClient.getClusterState().getCollection(coll);
+    log.info("### Before decommission: {}", initialCollection);
+    List<Integer> initialReplicaCounts =
+        l.stream()
+            .map(node -> initialCollection.getReplicas(node).size())
+            .collect(Collectors.toList());
+    Map<?, ?> response =
+        callMigrateReplicas(
+            cloudClient,
+            new MigrateReplicasAPI.MigrateReplicasRequestBody(
+                new HashSet<>(nodesToBeDecommissioned), Collections.emptySet(), true, null));
+    assertEquals(
+        "MigrateReplicas request was unsuccessful",
+        0L,
+        ((Map<?, ?>) response.get("responseHeader")).get("status"));
+
+    DocCollection collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+    assertNotNull("Collection cannot be null: " + coll, collection);
+    log.info("### After decommission: {}", collection);
+    // check what are replica states on the decommissioned nodes
+    for (String nodeToBeDecommissioned : nodesToBeDecommissioned) {
+      List<Replica> replicas = collection.getReplicas(nodeToBeDecommissioned);
+      if (replicas == null) {
+        replicas = Collections.emptyList();
+      }
+      assertEquals(
+          "There should be no more replicas on the sourceNode after a migrateReplicas request.",
+          Collections.emptyList(),
+          replicas);
+    }
+
+    for (String node : eventualTargetNodes) {
+      assertEquals(
+          "The non-source node '" + node + "' has the wrong number of replicas after the migration",
+          2,
+          collection.getReplicas(node).size());
+    }
+  }
+
+  @Test
+  public void testFailOnSingleNode() throws Exception {
+    configureCluster(1)
+        .addConfig(
+            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "migratereplicastest_singlenode_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    cloudClient.request(CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 0, 0));
+
+    cluster.waitForActiveCollection(coll, 5, 5);
+
+    String liveNode = cloudClient.getClusterState().getLiveNodes().iterator().next();
+    Map<?, ?> response =
+        callMigrateReplicas(
+            cloudClient,
+            new MigrateReplicasAPI.MigrateReplicasRequestBody(
+                Set.of(liveNode), Collections.emptySet(), true, null));
+    assertNotNull(
+        "No error in response, when the request should have failed", response.get("error"));
+    assertEquals(
+        "Wrong error message",
+        "No nodes other than the source nodes are live, therefore replicas cannot be migrated",
+        ((Map<?, ?>) response.get("error")).get("msg"));
+  }
+
+  public Map<?, ?> callMigrateReplicas(
+      CloudSolrClient cloudClient, MigrateReplicasAPI.MigrateReplicasRequestBody body)
+      throws IOException {
+    HttpEntityEnclosingRequestBase httpRequest = null;
+    HttpEntity entity;
+    String response = null;
+    Map<?, ?> r = null;
+
+    String uri =
+        cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "")
+            + "/api/cluster/replicas/migrate";
+    try {
+      httpRequest = new HttpPost(uri);
+
+      httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON));
+      httpRequest.setHeader("Accept", "application/json");
+      entity =
+          ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity();
+      try {
+        response = EntityUtils.toString(entity, UTF_8);
+        r = (Map<?, ?>) Utils.fromJSONString(response);
+        assertNotNull("No response given from MigrateReplicas API", r);
+        assertNotNull("No responseHeader given from MigrateReplicas API", r.get("responseHeader"));
+      } catch (JSONParser.ParseException e) {
+        log.error("err response: {}", response);
+        throw new AssertionError(e);
+      }
+    } finally {
+      httpRequest.releaseConnection();
+    }
+    return r;
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
index bf4be82d6b7..6d3f208761a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
@@ -219,6 +219,7 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
+        break;
       }
     }
     // Return last state?
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java
new file mode 100644
index 00000000000..e87a4c67532
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/admin/api/MigrateReplicasAPITest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.admin.api;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.OverseerSolrResponse;
+import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@link ReplaceNodeAPI} */
+public class MigrateReplicasAPITest extends SolrTestCaseJ4 {
+
+  private CoreContainer mockCoreContainer;
+  private SolrQueryRequest mockQueryRequest;
+  private SolrQueryResponse queryResponse;
+  private MigrateReplicasAPI migrateReplicasAPI;
+  private DistributedCollectionConfigSetCommandRunner mockCommandRunner;
+  private ArgumentCaptor<ZkNodeProps> messageCapturer;
+
+  @BeforeClass
+  public static void ensureWorkingMockito() {
+    assumeWorkingMockito();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+    mockCoreContainer = mock(CoreContainer.class);
+    mockCommandRunner = mock(DistributedCollectionConfigSetCommandRunner.class);
+    when(mockCoreContainer.getDistributedCollectionCommandRunner())
+        .thenReturn(Optional.of(mockCommandRunner));
+    when(mockCommandRunner.runCollectionCommand(any(), any(), anyLong()))
+        .thenReturn(new OverseerSolrResponse(new NamedList<>()));
+    mockQueryRequest = mock(SolrQueryRequest.class);
+    queryResponse = new SolrQueryResponse();
+    migrateReplicasAPI = new MigrateReplicasAPI(mockCoreContainer, mockQueryRequest, queryResponse);
+    messageCapturer = ArgumentCaptor.forClass(ZkNodeProps.class);
+
+    when(mockCoreContainer.isZooKeeperAware()).thenReturn(true);
+  }
+
+  @Test
+  public void testCreatesValidOverseerMessage() throws Exception {
+    MigrateReplicasAPI.MigrateReplicasRequestBody requestBody =
+        new MigrateReplicasAPI.MigrateReplicasRequestBody(
+            Set.of("demoSourceNode"), Set.of("demoTargetNode"), false, "async");
+    migrateReplicasAPI.migrateReplicas(requestBody);
+    verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong());
+
+    final ZkNodeProps createdMessage = messageCapturer.getValue();
+    final Map<String, Object> createdMessageProps = createdMessage.getProperties();
+    assertEquals(5, createdMessageProps.size());
+    assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes"));
+    assertEquals(Set.of("demoTargetNode"), createdMessageProps.get("targetNodes"));
+    assertEquals(false, createdMessageProps.get("waitForFinalState"));
+    assertEquals("async", createdMessageProps.get("async"));
+    assertEquals("migrate_replicas", createdMessageProps.get("operation"));
+  }
+
+  @Test
+  public void testNoTargetNodes() throws Exception {
+    MigrateReplicasAPI.MigrateReplicasRequestBody requestBody =
+        new MigrateReplicasAPI.MigrateReplicasRequestBody(
+            Set.of("demoSourceNode"), null, null, null);
+    migrateReplicasAPI.migrateReplicas(requestBody);
+    verify(mockCommandRunner).runCollectionCommand(messageCapturer.capture(), any(), anyLong());
+
+    final ZkNodeProps createdMessage = messageCapturer.getValue();
+    final Map<String, Object> createdMessageProps = createdMessage.getProperties();
+    assertEquals(2, createdMessageProps.size());
+    assertEquals(Set.of("demoSourceNode"), createdMessageProps.get("sourceNodes"));
+    assertEquals("migrate_replicas", createdMessageProps.get("operation"));
+  }
+
+  @Test
+  public void testNoSourceNodesThrowsError() throws Exception {
+    MigrateReplicasAPI.MigrateReplicasRequestBody requestBody1 =
+        new MigrateReplicasAPI.MigrateReplicasRequestBody(
+            Collections.emptySet(), Set.of("demoTargetNode"), null, null);
+    assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody1));
+    MigrateReplicasAPI.MigrateReplicasRequestBody requestBody2 =
+        new MigrateReplicasAPI.MigrateReplicasRequestBody(
+            null, Set.of("demoTargetNode"), null, null);
+    assertThrows(SolrException.class, () -> migrateReplicasAPI.migrateReplicas(requestBody2));
+  }
+}
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
index dc84a3239c3..265ce8bac75 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
@@ -368,6 +368,13 @@ At this point, if you run a query on a node having e.g., `rack=rack1`, Solr will
 
 Shuffle the replicas across the given set of Solr nodes until an equilibrium is reached.
 
+The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin]
+will be used to decide:
+
+* Which replicas should be moved for the balancing
+* Which nodes those replicas should be placed
+* When the cluster has reached an "equilibrium"
+
 [example.tab-pane#v2balancereplicas]
 ====
 [.tab-label]*V2 API*
@@ -417,17 +424,17 @@ If `false`, the API will return when the bare minimum replicas are active, such
 +
 Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously].
 
+=== BalanceReplicas Response
+
+The response will include the status of the request.
+If the status is anything other than "0", an error message will explain why the request failed.
+
 [IMPORTANT]
 ====
 This operation does not hold necessary locks on the replicas that belong to on the source node.
 So don't perform other collection operations in this period.
 ====
 
-=== BalanceReplicas Response
-
-The response will include the status of the request.
-If the status is anything other than "0", an error message will explain why the request failed.
-
 [[balanceshardunique]]
 == BALANCESHARDUNIQUE: Balance a Property Across Nodes
 
@@ -537,14 +544,106 @@ http://localhost:8983/solr/admin/collections?action=BALANCESHARDUNIQUE&collectio
 
 Examining the clusterstate after issuing this call should show exactly one replica in each shard that has this property.
 
+[[migratereplicas]]
+== Migrate Replicas
+
+Migrate all replicas off of a given set of source nodes.
++
+If more than one node is used as a targetNode (either explicitly, or by default), then the configured
+xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine
+which targetNode should be used for each migrated replica.
+
+[example.tab-pane#v2migratereplicas]
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+curl -X POST http://localhost:8983/api/cluster/replicas/migrate -H 'Content-Type: application/json' -d '
+  {
+    "sourceNodes": ["localhost:8983_solr", "localhost:8984_solr"],
+    "targetNodes": ["localhost:8985_solr", "localhost:8986_solr"],
+    "async": "migrate-replicas-1"
+  }
+'
+----
+====
+
+===  Parameters
+
+
+`sourceNodes`::
++
+[%autowidth,frame=none]
+|===
+|Required |Default: `[]`
+|===
++
+The nodes over which replicas will be balanced.
+Replicas that live outside this set of nodes will not be included in the balancing.
++
+If this parameter is not provided, all live data nodes will be used.
+
+`targetNodes`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `[]`
+|===
++
+The nodes which the migrated replicas will be moved to.
+If none is provided, then the API will use all live nodes not provided in `sourceNodes`.
++
+If there is more than one node to migrate the replicas to, then the configured PlacementPlugin replica will have one of these nodes selected
+
+`waitForFinalState`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the request will complete only when all affected replicas become active.
+If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas.
+
+`async`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: none
+|===
++
+Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously].
+
+=== MigrateReplicas Response
+
+The response will include the status of the request.
+If the status is anything other than "0", an error message will explain why the request failed.
+
+[IMPORTANT]
+====
+This operation does not hold necessary locks on the replicas that belong to on the source node.
+So don't perform other collection operations in this period.
+====
+
 [[replacenode]]
 == REPLACENODE: Move All Replicas in a Node to Another
 
+[WARNING]
+====
+This API's functionality has been replaced and enhanced by <<migratereplicas>>, please consider using the new
+API instead, as this API may be removed in a future version.
+====
+
 This command recreates replicas in one node (the source) on another node(s) (the target).
 After each replica is copied, the replicas in the source node are deleted.
 
 For source replicas that are also shard leaders the operation will wait for the number of seconds set with the `timeout` parameter to make sure there's an active replica that can become a leader, either an existing replica becoming a leader or the new replica completing recovery and becoming a leader).
 
+If no targetNode is provided, then the configured
+xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin] will be used to determine
+which node each recreated replica should be placed on.
+
 [.dynamic-tabs]
 --
 [example.tab-pane#v1replacenode]
@@ -595,7 +694,9 @@ The source node from which the replicas need to be copied from.
 |===
 +
 The target node where replicas will be copied.
-If this parameter is not provided, Solr will identify nodes automatically based on policies or number of cores in each node.
+If this parameter is not provided, Solr will use all live nodes except for the `sourceNode`.
+The configured xref:configuration-guide:replica-placement-plugins.adoc[Replica Placement Plugin]
+will be used to determine which node will be used for each replica.
 
 `parallel`::
 +
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index 516fdbfb919..73e2e9518a1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -37,6 +37,8 @@ public interface CollectionParams {
 
   String SOURCE_NODE = "sourceNode";
   String TARGET_NODE = "targetNode";
+  String SOURCE_NODES = "sourceNodes";
+  String TARGET_NODES = "targetNodes";
 
   String NODES = "nodes";
   String MAX_BALANCE_SKEW = "maxBalanceSkew";
@@ -130,6 +132,8 @@ public interface CollectionParams {
     // TODO when we have a node level lock use it here
     REPLACENODE(true, LockLevel.NONE),
     // TODO when we have a node level lock use it here
+    MIGRATE_REPLICAS(true, LockLevel.NONE),
+    // TODO when we have a node level lock use it here
     BALANCE_REPLICAS(true, LockLevel.NONE),
     DELETENODE(true, LockLevel.NONE),
     MOCK_REPLICA_TASK(false, LockLevel.REPLICA),