You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2023/06/15 16:17:56 UTC
[solr] branch main updated: SOLR-16806: Create a BalanceReplicas API (#1650)
This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new de5d1aaec7d SOLR-16806: Create a BalanceReplicas API (#1650)
de5d1aaec7d is described below
commit de5d1aaec7d5269ebd60ba8e73f06dc31a98887d
Author: Houston Putman <ho...@apache.org>
AuthorDate: Thu Jun 15 12:17:51 2023 -0400
SOLR-16806: Create a BalanceReplicas API (#1650)
- Introduce BalanceReplicasAPI
- Add computeReplicaBalancing() in Assign, and computeBalancing() in PlacementPlugin.
The default implementation returns an empty balancePlan (moving no replicas).
- Refactor all provided PlacementPlugins, to implement OrderedNodePlacementPlugin,
and share a common way of computing placements and balancing.
Also fixes SOLR-16816, updating metrics when doing multi-shard/collection placements.
---
solr/CHANGES.txt | 14 +-
.../apache/solr/cloud/api/collections/Assign.java | 15 +
.../cloud/api/collections/BalanceReplicasCmd.java | 91 ++
.../solr/cloud/api/collections/CollApiCmds.java | 2 +
.../solr/cloud/api/collections/DeleteNodeCmd.java | 104 +-
.../solr/cloud/api/collections/ReplaceNodeCmd.java | 248 +---
...laceNodeCmd.java => ReplicaMigrationUtils.java} | 254 ++--
.../apache/solr/cluster/placement/BalancePlan.java | 46 +
.../solr/cluster/placement/BalancePlanFactory.java | 38 +
.../solr/cluster/placement/BalanceRequest.java | 48 +
.../solr/cluster/placement/PlacementContext.java | 3 +
.../solr/cluster/placement/PlacementPlugin.java | 16 +-
.../placement/impl/BalancePlanFactoryImpl.java | 34 +
.../cluster/placement/impl/BalancePlanImpl.java | 58 +
.../cluster/placement/impl/BalanceRequestImpl.java | 86 ++
.../impl/PlacementPluginAssignStrategy.java | 57 +
.../placement/impl/SimplePlacementContextImpl.java | 7 +
.../plugins/AffinityPlacementFactory.java | 1432 +++++++-------------
.../plugins/MinimizeCoresPlacementFactory.java | 159 +--
.../plugins/OrderedNodePlacementPlugin.java | 702 ++++++++++
.../placement/plugins/RandomPlacementFactory.java | 131 +-
.../placement/plugins/SimplePlacementFactory.java | 224 +--
.../solr/handler/admin/CollectionsHandler.java | 2 +
.../solr/handler/admin/api/BalanceReplicasAPI.java | 128 ++
.../org/apache/solr/cloud/BalanceReplicasTest.java | 193 +++
.../impl/PlacementPluginIntegrationTest.java | 6 +-
.../plugins/AbstractPlacementFactoryTest.java | 225 +++
.../plugins/AffinityPlacementFactoryTest.java | 503 +++++--
.../plugins/MinimizeCoresPlacementFactoryTest.java | 348 +++++
.../pages/cluster-node-management.adoc | 77 +-
.../java/org/apache/solr/common/cloud/Replica.java | 9 +
.../solr/common/params/CollectionParams.java | 5 +
.../apache/solr/cluster/placement/Builders.java | 27 +-
33 files changed, 3480 insertions(+), 1812 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 26a2b93a12b..d21bcfec809 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -60,11 +60,14 @@ New Features
* SOLR-16674: Introduced support for byte vector encoding in DenseVectorField and KnnQParser (Elia Porciani via Alessandro Benedetti).
-* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across domains within the availablity zone and
+* SOLR-16719: AffinityPlacementFactory now supports spreading replicas across domains within the availability zone and
optionally fail the request if more than a configurable number of replicas need to be placed in a single domain. (Houston Putman, Tomás Fernández Löbbe)
* SOLR-16836: Introduced support for high dimensional vectors (Alessandro Benedetti).
+* SOLR-16806: Solr now provides a BalanceReplicas API at `POST /api/cluster/replicas/balance` (v2), to spread replicas
+ across a given set of nodes. No v1 API is available. (Houston Putman, Tomás Fernández Löbbe, Jason Gerlowski, Radu Gheorghe)
+
Improvements
---------------------
@@ -146,6 +149,9 @@ Improvements
* SOLR-9378: Internal shard requests no longer include the wasteful shard.url param. [shard] transformer now defaults to returning
only the shard id (based on luceneMatchVersion), but can be configured to return the legacy list of replicas. (hossman)
+* SOLR-16816: Update node metrics while making affinityPlacement selections. Therefore selections can be made given the expected cluster
+ information after the previous selections are implemented. (Houston Putman)
+
* SOLR-16392: The v2 "create shard" API has been tweaked to be more intuitive, by removing the top-level "create"
command specifier. The rest of the API remains unchanged. (Jason Gerlowski)
@@ -162,6 +168,12 @@ Improvements
specifier from the request body, and changing the path. The v2 functionality can now be accessed at:
`POST /api/collections/cName/balance-shard-unique {...}` (Jason Gerlowski)
+* SOLR-16806: The included PlacementPlugins (Random, Simple, MinimizeCores and Affinity) now all implement the
+ OrderedNodePlacementPlugin, which provides the implementation for computePlacements() and computeBalancing().
+ Each implementing PlacementPlugin provides a way of weighting Solr Nodes, and the OrderedNodePlacement plugin
+ then uses the weights to decide the optimal strategy for placing new replicas or balancing existing replicas.
+ (Houston Putman, Tomás Fernández Löbbe, Jason Gerlowski, Radu Gheorghe)
+
Optimizations
---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index ff8b7c71b32..6304e522209 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -451,6 +451,21 @@ public class Assign {
SolrCloudManager solrCloudManager, List<AssignRequest> assignRequests)
throws AssignmentException, IOException, InterruptedException;
+ /**
+ * Balance replicas across nodes.
+ *
+ * @param solrCloudManager current instance of {@link SolrCloudManager}.
+ * @param nodes to compute replica balancing across.
+ * @param maxBalanceSkew to ensure strictness of replica balancing.
+ * @return Map from Replica to the Node where that Replica should be moved.
+ * @throws AssignmentException when balance request cannot produce any valid assignments.
+ */
+ default Map<Replica, String> computeReplicaBalancing(
+ SolrCloudManager solrCloudManager, Set<String> nodes, int maxBalanceSkew)
+ throws AssignmentException, IOException, InterruptedException {
+ return Collections.emptyMap();
+ }
+
/**
* Verify that deleting a collection doesn't violate the replica assignment constraints.
*
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java
new file mode 100644
index 00000000000..7d84c7c6633
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BalanceReplicasCmd.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.util.NamedList;
+
+public class BalanceReplicasCmd implements CollApiCmds.CollectionApiCommand {
+ private final CollectionCommandContext ccc;
+
+ public BalanceReplicasCmd(CollectionCommandContext ccc) {
+ this.ccc = ccc;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
+ throws Exception {
+ Set<String> nodes;
+ Object nodesRaw = message.get(CollectionParams.NODES);
+ if (nodesRaw == null) {
+ nodes = Collections.emptySet();
+ } else if (nodesRaw instanceof Set) {
+ nodes = (Set<String>) nodesRaw;
+ } else if (nodesRaw instanceof Collection) {
+ nodes = new HashSet<>((Collection<String>) nodesRaw);
+ } else if (nodesRaw instanceof String) {
+ nodes = Set.of(((String) nodesRaw).split(","));
+ } else {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "'nodes' was not passed as a correct type (Set/List/String): "
+ + nodesRaw.getClass().getName());
+ }
+ boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+ String async = message.getStr(ASYNC);
+ int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+ boolean parallel = message.getBool("parallel", false);
+
+ if (nodes.size() == 1) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "Cannot balance across a single node: " + nodes.stream().findAny().get());
+ }
+
+ Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
+ Map<Replica, String> replicaMovements =
+ assignStrategy.computeReplicaBalancing(
+ ccc.getSolrCloudManager(),
+ nodes,
+ message.getInt(CollectionParams.MAX_BALANCE_SKEW, -1));
+
+ boolean migrationSuccessful =
+ ReplicaMigrationUtils.migrateReplicas(
+ ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
+ if (migrationSuccessful) {
+ results.add(
+ "success",
+ "BalanceReplicas action completed successfully across nodes : ["
+ + String.join(", ", nodes)
+ + "]");
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
index 8538c059d93..e35023023b1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
@@ -35,6 +35,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.AD
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCE_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
@@ -142,6 +143,7 @@ public class CollApiCmds {
commandMap =
Map.ofEntries(
Map.entry(REPLACENODE, new ReplaceNodeCmd(ccc)),
+ Map.entry(BALANCE_REPLICAS, new BalanceReplicasCmd(ccc)),
Map.entry(DELETENODE, new DeleteNodeCmd(ccc)),
Map.entry(BACKUP, new BackupCmd(ccc)),
Map.entry(RESTORE, new RestoreCmd(ccc)),
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
index d4ae8154ec7..0715da370a5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -17,31 +17,18 @@
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
private final CollectionCommandContext ccc;
public DeleteNodeCmd(CollectionCommandContext ccc) {
@@ -53,7 +40,7 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
throws Exception {
CollectionHandlingUtils.checkRequired(message, "node");
String node = message.getStr("node");
- List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, state);
+ List<Replica> sourceReplicas = ReplicaMigrationUtils.getReplicasOfNode(node, state);
List<String> singleReplicas = verifyReplicaAvailability(sourceReplicas, state);
if (!singleReplicas.isEmpty()) {
results.add(
@@ -61,32 +48,25 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
"Can't delete the only existing non-PULL replica(s) on node "
+ node
+ ": "
- + singleReplicas.toString());
+ + singleReplicas);
} else {
- cleanupReplicas(results, state, sourceReplicas, ccc, node, message.getStr(ASYNC));
+ ReplicaMigrationUtils.cleanupReplicas(
+ results, state, sourceReplicas, ccc, message.getStr(ASYNC));
}
}
// collect names of replicas that cannot be deleted
- static List<String> verifyReplicaAvailability(
- List<ZkNodeProps> sourceReplicas, ClusterState state) {
+ static List<String> verifyReplicaAvailability(List<Replica> sourceReplicas, ClusterState state) {
List<String> res = new ArrayList<>();
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- String coll = sourceReplica.getStr(COLLECTION_PROP);
- String shard = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ for (Replica sourceReplica : sourceReplicas) {
+ String coll = sourceReplica.getCollection();
+ String shard = sourceReplica.getShard();
+ String replicaName = sourceReplica.getName();
DocCollection collection = state.getCollection(coll);
Slice slice = collection.getSlice(shard);
if (slice.getReplicas().size() < 2) {
// can't delete the only replica in existence
- res.add(
- coll
- + "/"
- + shard
- + "/"
- + replicaName
- + ", type="
- + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
} else { // check replica types
int otherNonPullReplicas = 0;
for (Replica r : slice.getReplicas()) {
@@ -96,72 +76,10 @@ public class DeleteNodeCmd implements CollApiCmds.CollectionApiCommand {
}
// can't delete - there are no other non-pull replicas
if (otherNonPullReplicas == 0) {
- res.add(
- coll
- + "/"
- + shard
- + "/"
- + replicaName
- + ", type="
- + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
}
}
}
return res;
}
-
- static void cleanupReplicas(
- NamedList<Object> results,
- ClusterState clusterState,
- List<ZkNodeProps> sourceReplicas,
- CollectionCommandContext ccc,
- String node,
- String async)
- throws IOException, InterruptedException {
- CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- String coll = sourceReplica.getStr(COLLECTION_PROP);
- String shard = sourceReplica.getStr(SHARD_ID_PROP);
- String type = sourceReplica.getStr(ZkStateReader.REPLICA_TYPE);
- log.info(
- "Deleting replica type={} for collection={} shard={} on node={}",
- type,
- coll,
- shard,
- node);
- NamedList<Object> deleteResult = new NamedList<>();
- try {
- if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
- new DeleteReplicaCmd(ccc)
- .deleteReplica(
- clusterState,
- sourceReplica.plus("parallel", "true"),
- deleteResult,
- () -> {
- cleanupLatch.countDown();
- if (deleteResult.get("failure") != null) {
- synchronized (results) {
- results.add(
- "failure",
- String.format(
- Locale.ROOT,
- "Failed to delete replica for collection=%s shard=%s" + " on node=%s",
- coll,
- shard,
- node));
- }
- }
- });
- } catch (KeeperException e) {
- log.warn("Error deleting ", e);
- cleanupLatch.countDown();
- } catch (Exception e) {
- log.warn("Error deleting ", e);
- cleanupLatch.countDown();
- throw e;
- }
- }
- log.debug("Waiting for delete node action to complete");
- cleanupLatch.await(5, TimeUnit.MINUTES);
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 52cba5b0fc1..fe47f96cc6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -17,41 +17,25 @@
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import org.apache.solr.cloud.ActiveReplicaWatcher;
-import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CollectionCommandContext ccc;
@@ -89,41 +73,20 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
+ source
+ " are live, therefore replicas cannot be moved");
}
- List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
- // how many leaders are we moving? for these replicas we have to make sure that either:
- // * another existing replica can become a leader, or
- // * we wait until the newly created replica completes recovery (and can become the new leader)
- // If waitForFinalState=true we wait for all replicas
- int numLeaders = 0;
- for (ZkNodeProps props : sourceReplicas) {
- if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- numLeaders++;
- }
- }
- // map of collectionName_coreNodeName to watchers
- Map<String, CollectionStateWatcher> watchers = new HashMap<>();
- List<ZkNodeProps> createdReplicas = new ArrayList<>();
-
- AtomicBoolean anyOneFailed = new AtomicBoolean(false);
- SolrCloseableLatch countDownLatch =
- new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+ List<Replica> sourceReplicas = ReplicaMigrationUtils.getReplicasOfNode(source, clusterState);
+ Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(sourceReplicas.size());
- SolrCloseableLatch replicasToRecover =
- new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
-
- List<ReplicaPosition> replicaPositions = null;
if (target == null || target.isEmpty()) {
List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- Replica.Type replicaType =
- Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ for (Replica sourceReplica : sourceReplicas) {
+ Replica.Type replicaType = sourceReplica.getType();
int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
Assign.AssignRequest assignRequest =
new Assign.AssignRequestBuilder()
- .forCollection(sourceReplica.getStr(COLLECTION_PROP))
- .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
+ .forCollection(sourceReplica.getCollection())
+ .forShard(Collections.singletonList(sourceReplica.getShard()))
.assignNrtReplicas(numNrtReplicas)
.assignTlogReplicas(numTlogReplicas)
.assignPullReplicas(numPullReplicas)
@@ -135,194 +98,25 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
assignRequests.add(assignRequest);
}
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
- replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
- }
- int replicaPositionIdx = 0;
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
- if (log.isInfoEnabled()) {
- log.info(
- "Going to create replica for collection={} shard={} on node={}",
- sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
+ List<ReplicaPosition> replicaPositions =
+ assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+ int position = 0;
+ for (Replica sourceReplica : sourceReplicas) {
+ replicaMovements.put(sourceReplica, replicaPositions.get(position++).node);
}
- String targetNode;
- // Use the assigned replica positions, if target is null or empty (checked above)
- if (replicaPositions != null) {
- targetNode = replicaPositions.get(replicaPositionIdx).node;
- replicaPositionIdx++;
- } else {
- targetNode = target;
- }
- ZkNodeProps msg =
- sourceReplica
- .plus("parallel", String.valueOf(parallel))
- .plus(CoreAdminParams.NODE, targetNode);
- if (async != null) msg.getProperties().put(ASYNC, async);
- NamedList<Object> nl = new NamedList<>();
- final ZkNodeProps addedReplica =
- new AddReplicaCmd(ccc)
- .addReplica(
- clusterState,
- msg,
- nl,
- () -> {
- countDownLatch.countDown();
- if (nl.get("failure") != null) {
- String errorString =
- String.format(
- Locale.ROOT,
- "Failed to create replica for collection=%s shard=%s" + " on node=%s",
- sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
- log.warn(errorString);
- // one replica creation failed. Make the best attempt to
- // delete all the replicas created so far in the target
- // and exit
- synchronized (results) {
- results.add("failure", errorString);
- anyOneFailed.set(true);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug(
- "Successfully created replica for collection={} shard={} on node={}",
- sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
- }
- }
- })
- .get(0);
-
- if (addedReplica != null) {
- createdReplicas.add(addedReplica);
- if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- String shardName = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
- String collectionName = sourceCollection;
- String key = collectionName + "_" + replicaName;
- CollectionStateWatcher watcher;
- if (waitForFinalState) {
- watcher =
- new ActiveReplicaWatcher(
- collectionName,
- null,
- Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
- replicasToRecover);
- } else {
- watcher =
- new LeaderRecoveryWatcher(
- collectionName,
- shardName,
- replicaName,
- addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
- replicasToRecover);
- }
- watchers.put(key, watcher);
- log.debug("--- adding {}, {}", key, watcher);
- zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
- } else {
- log.debug("--- not waiting for {}", addedReplica);
- }
- }
- }
-
- log.debug("Waiting for replicas to be added");
- if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
- log.info("Timed out waiting for replicas to be added");
- anyOneFailed.set(true);
} else {
- log.debug("Finished waiting for replicas to be added");
- }
-
- // now wait for leader replicas to recover
- log.debug("Waiting for {} leader replicas to recover", numLeaders);
- if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
- if (log.isInfoEnabled()) {
- log.info(
- "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount());
+ for (Replica sourceReplica : sourceReplicas) {
+ replicaMovements.put(sourceReplica, target);
}
- anyOneFailed.set(true);
- } else {
- log.debug("Finished waiting for leader replicas to recover");
}
- // remove the watchers, we're done either way
- for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
- zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
- }
- if (anyOneFailed.get()) {
- log.info("Failed to create some replicas. Cleaning up all replicas on target node");
- SolrCloseableLatch cleanupLatch =
- new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
- for (ZkNodeProps createdReplica : createdReplicas) {
- NamedList<Object> deleteResult = new NamedList<>();
- try {
- new DeleteReplicaCmd(ccc)
- .deleteReplica(
- zkStateReader.getClusterState(),
- createdReplica.plus("parallel", "true"),
- deleteResult,
- () -> {
- cleanupLatch.countDown();
- if (deleteResult.get("failure") != null) {
- synchronized (results) {
- results.add(
- "failure",
- "Could not cleanup, because of : " + deleteResult.get("failure"));
- }
- }
- });
- } catch (KeeperException e) {
- cleanupLatch.countDown();
- log.warn("Error deleting replica ", e);
- } catch (Exception e) {
- log.warn("Error deleting replica ", e);
- cleanupLatch.countDown();
- throw e;
- }
- }
- cleanupLatch.await(5, TimeUnit.MINUTES);
- return;
- }
-
- // we have reached this far means all replicas could be recreated
- // now cleanup the replicas in the source node
- DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async);
- results.add(
- "success",
- "REPLACENODE action completed successfully from : " + source + " to : " + target);
- }
- static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
- List<ZkNodeProps> sourceReplicas = new ArrayList<>();
- for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
- for (Slice slice : e.getValue().getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- if (source.equals(replica.getNodeName())) {
- ZkNodeProps props =
- new ZkNodeProps(
- COLLECTION_PROP,
- e.getKey(),
- SHARD_ID_PROP,
- slice.getName(),
- ZkStateReader.CORE_NAME_PROP,
- replica.getCoreName(),
- ZkStateReader.REPLICA_PROP,
- replica.getName(),
- ZkStateReader.REPLICA_TYPE,
- replica.getType().name(),
- ZkStateReader.LEADER_PROP,
- String.valueOf(replica.equals(slice.getLeader())),
- CoreAdminParams.NODE,
- source);
- sourceReplicas.add(props);
- }
- }
- }
+ boolean migrationSuccessful =
+ ReplicaMigrationUtils.migrateReplicas(
+ ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results);
+ if (migrationSuccessful) {
+ results.add(
+ "success",
+ "REPLACENODE action completed successfully from : " + source + " to : " + target);
}
- return sourceReplicas;
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
similarity index 54%
copy from solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
copy to solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
index 52cba5b0fc1..903d509da18 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java
@@ -17,12 +17,12 @@
package org.apache.solr.cloud.api.collections;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -30,73 +30,56 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
+public class ReplicaMigrationUtils {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final CollectionCommandContext ccc;
-
- public ReplaceNodeCmd(CollectionCommandContext ccc) {
- this.ccc = ccc;
- }
-
- @Override
- public void call(ClusterState state, ZkNodeProps message, NamedList<Object> results)
- throws Exception {
- ZkStateReader zkStateReader = ccc.getZkStateReader();
- String source = message.getStr(CollectionParams.SOURCE_NODE);
- String target = message.getStr(CollectionParams.TARGET_NODE);
- boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
- if (source == null) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "sourceNode is a required param");
- }
- String async = message.getStr(ASYNC);
- int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
- boolean parallel = message.getBool("parallel", false);
- ClusterState clusterState = zkStateReader.getClusterState();
-
- if (!clusterState.liveNodesContain(source)) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
- }
- if (target != null && !clusterState.liveNodesContain(target)) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
- } else if (clusterState.getLiveNodes().size() <= 1) {
- throw new SolrException(
- SolrException.ErrorCode.BAD_REQUEST,
- "No nodes other than the source node: "
- + source
- + " are live, therefore replicas cannot be moved");
- }
- List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
+ /**
+ * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the
+ * old replicas after the creation is done.
+ *
+ * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted.
+ *
+ * @param ccc The collection command context to use from the API that calls this method
+ * @param movements a map from replica to the new node that the replica should live on
+ * @param parallel whether the replica creations should be done in parallel
+ * @param waitForFinalState wait for the final state of all newly created replicas before
+ * continuing
+ * @param timeout the amount of time to wait for new replicas to be created
+ * @param asyncId If provided, the command will be run under the given asyncId
+ * @param results push results (successful and failure) onto this list
+ * @return whether the command was successful
+ */
+ static boolean migrateReplicas(
+ CollectionCommandContext ccc,
+ Map<Replica, String> movements,
+ boolean parallel,
+ boolean waitForFinalState,
+ int timeout,
+ String asyncId,
+ NamedList<Object> results)
+ throws IOException, InterruptedException, KeeperException {
// how many leaders are we moving? for these replicas we have to make sure that either:
// * another existing replica can become a leader, or
// * we wait until the newly created replica completes recovery (and can become the new leader)
// If waitForFinalState=true we wait for all replicas
int numLeaders = 0;
- for (ZkNodeProps props : sourceReplicas) {
- if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+ for (Replica replica : movements.keySet()) {
+ if (replica.isLeader() || waitForFinalState) {
numLeaders++;
}
}
@@ -106,60 +89,31 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
SolrCloseableLatch countDownLatch =
- new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+ new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
SolrCloseableLatch replicasToRecover =
new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
- List<ReplicaPosition> replicaPositions = null;
- if (target == null || target.isEmpty()) {
- List<Assign.AssignRequest> assignRequests = new ArrayList<>(sourceReplicas.size());
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- Replica.Type replicaType =
- Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
- int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
- int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
- int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
- Assign.AssignRequest assignRequest =
- new Assign.AssignRequestBuilder()
- .forCollection(sourceReplica.getStr(COLLECTION_PROP))
- .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
- .assignNrtReplicas(numNrtReplicas)
- .assignTlogReplicas(numTlogReplicas)
- .assignPullReplicas(numPullReplicas)
- .onNodes(
- ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes().stream()
- .filter(node -> !node.equals(source))
- .collect(Collectors.toList()))
- .build();
- assignRequests.add(assignRequest);
- }
- Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer());
- replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
- }
- int replicaPositionIdx = 0;
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
+ ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+ for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+ Replica sourceReplica = movement.getKey();
+ String targetNode = movement.getValue();
+ String sourceCollection = sourceReplica.getCollection();
if (log.isInfoEnabled()) {
log.info(
"Going to create replica for collection={} shard={} on node={}",
sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
- }
- String targetNode;
- // Use the assigned replica positions, if target is null or empty (checked above)
- if (replicaPositions != null) {
- targetNode = replicaPositions.get(replicaPositionIdx).node;
- replicaPositionIdx++;
- } else {
- targetNode = target;
+ sourceReplica.getShard(),
+ targetNode);
}
+
ZkNodeProps msg =
sourceReplica
+ .toFullProps()
.plus("parallel", String.valueOf(parallel))
.plus(CoreAdminParams.NODE, targetNode);
- if (async != null) msg.getProperties().put(ASYNC, async);
+ if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
NamedList<Object> nl = new NamedList<>();
final ZkNodeProps addedReplica =
new AddReplicaCmd(ccc)
@@ -173,10 +127,10 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
String errorString =
String.format(
Locale.ROOT,
- "Failed to create replica for collection=%s shard=%s" + " on node=%s",
+ "Failed to create replica for collection=%s shard=%s on node=%s",
sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
+ sourceReplica.getShard(),
+ targetNode);
log.warn(errorString);
// one replica creation failed. Make the best attempt to
// delete all the replicas created so far in the target
@@ -190,8 +144,8 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
log.debug(
"Successfully created replica for collection={} shard={} on node={}",
sourceCollection,
- sourceReplica.getStr(SHARD_ID_PROP),
- target);
+ sourceReplica.getShard(),
+ targetNode);
}
}
})
@@ -199,23 +153,22 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
if (addedReplica != null) {
createdReplicas.add(addedReplica);
- if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- String shardName = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
- String collectionName = sourceCollection;
- String key = collectionName + "_" + replicaName;
+ if (sourceReplica.isLeader() || waitForFinalState) {
+ String shardName = sourceReplica.getShard();
+ String replicaName = sourceReplica.getName();
+ String key = sourceCollection + "_" + replicaName;
CollectionStateWatcher watcher;
if (waitForFinalState) {
watcher =
new ActiveReplicaWatcher(
- collectionName,
+ sourceCollection,
null,
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
replicasToRecover);
} else {
watcher =
new LeaderRecoveryWatcher(
- collectionName,
+ sourceCollection,
shardName,
replicaName,
addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
@@ -223,7 +176,7 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
}
watchers.put(key, watcher);
log.debug("--- adding {}, {}", key, watcher);
- zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher);
} else {
log.debug("--- not waiting for {}", addedReplica);
}
@@ -251,10 +204,10 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
}
// remove the watchers, we're done either way
for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
- zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
+ ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue());
}
if (anyOneFailed.get()) {
- log.info("Failed to create some replicas. Cleaning up all replicas on target node");
+ log.info("Failed to create some replicas. Cleaning up all newly created replicas.");
SolrCloseableLatch cleanupLatch =
new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn());
for (ZkNodeProps createdReplica : createdReplicas) {
@@ -262,7 +215,7 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
try {
new DeleteReplicaCmd(ccc)
.deleteReplica(
- zkStateReader.getClusterState(),
+ ccc.getZkStateReader().getClusterState(),
createdReplica.plus("parallel", "true"),
deleteResult,
() -> {
@@ -285,40 +238,79 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
}
}
cleanupLatch.await(5, TimeUnit.MINUTES);
- return;
+ return false;
}
- // we have reached this far means all replicas could be recreated
- // now cleanup the replicas in the source node
- DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async);
- results.add(
- "success",
- "REPLACENODE action completed successfully from : " + source + " to : " + target);
+ // we have reached this far, meaning all replicas should have been recreated.
+ // now cleanup the original replicas
+ return cleanupReplicas(
+ results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId);
+ }
+
+ static boolean cleanupReplicas(
+ NamedList<Object> results,
+ ClusterState clusterState,
+ Collection<Replica> sourceReplicas,
+ CollectionCommandContext ccc,
+ String async)
+ throws IOException, InterruptedException {
+ SolrCloseableLatch cleanupLatch =
+ new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
+ for (Replica sourceReplica : sourceReplicas) {
+ String coll = sourceReplica.getCollection();
+ String shard = sourceReplica.getShard();
+ String type = sourceReplica.getType().toString();
+ String node = sourceReplica.getNodeName();
+ log.info(
+ "Deleting replica type={} for collection={} shard={} on node={}",
+ type,
+ coll,
+ shard,
+ node);
+ NamedList<Object> deleteResult = new NamedList<>();
+ try {
+ ZkNodeProps cmdMessage = sourceReplica.toFullProps();
+ if (async != null) cmdMessage = cmdMessage.plus(ASYNC, async);
+ new DeleteReplicaCmd(ccc)
+ .deleteReplica(
+ clusterState,
+ cmdMessage.plus("parallel", "true"),
+ deleteResult,
+ () -> {
+ cleanupLatch.countDown();
+ if (deleteResult.get("failure") != null) {
+ synchronized (results) {
+ results.add(
+ "failure",
+ String.format(
+ Locale.ROOT,
+ "Failed to delete replica for collection=%s shard=%s on node=%s",
+ coll,
+ shard,
+ node));
+ }
+ }
+ });
+ } catch (KeeperException e) {
+ log.warn("Error deleting ", e);
+ cleanupLatch.countDown();
+ } catch (Exception e) {
+ log.warn("Error deleting ", e);
+ cleanupLatch.countDown();
+ throw e;
+ }
+ }
+ log.debug("Waiting for delete node action to complete");
+ return cleanupLatch.await(5, TimeUnit.MINUTES);
}
- static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
- List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+ static List<Replica> getReplicasOfNode(String nodeName, ClusterState state) {
+ List<Replica> sourceReplicas = new ArrayList<>();
for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
for (Slice slice : e.getValue().getSlices()) {
for (Replica replica : slice.getReplicas()) {
- if (source.equals(replica.getNodeName())) {
- ZkNodeProps props =
- new ZkNodeProps(
- COLLECTION_PROP,
- e.getKey(),
- SHARD_ID_PROP,
- slice.getName(),
- ZkStateReader.CORE_NAME_PROP,
- replica.getCoreName(),
- ZkStateReader.REPLICA_PROP,
- replica.getName(),
- ZkStateReader.REPLICA_TYPE,
- replica.getType().name(),
- ZkStateReader.LEADER_PROP,
- String.valueOf(replica.equals(slice.getLeader())),
- CoreAdminParams.NODE,
- source);
- sourceReplicas.add(props);
+ if (nodeName.equals(replica.getNodeName())) {
+ sourceReplicas.add(replica);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java
new file mode 100644
index 00000000000..e543004fd9e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlan.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+
+/**
+ * A fully specified plan or instructions for replica balancing to be applied to the cluster.
+ *
+ * <p>Fully specified means the actual {@link Node}'s on which to place replicas have been decided.
+ *
+ * <p>Instances are created by plugin code using {@link BalancePlanFactory}. This interface
+ * obviously doesn't expose much but the underlying Solr side implementation has all that is needed
+ * (and will do at least one cast in order to execute the plan, likely then using some type of
+ * visitor pattern).
+ */
+public interface BalancePlan {
+ /**
+ * @return the {@link BalanceRequest} at the origin of this {@link BalancePlan}, as passed to the
+ * {@link BalancePlanFactory} method that created this instance.
+ */
+ BalanceRequest getRequest();
+
+ /**
+ * @return the map of {@link Replica}'s to the {@link ReplicaPlacement}'s that describe where the
+ * replica should be placed.
+ */
+ Map<Replica, Node> getReplicaMovements();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java
new file mode 100644
index 00000000000..d395df40870
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalancePlanFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+
+/**
+ * Allows plugins to create {@link BalancePlan}s telling the Solr layer how to balance replicas
+ * following the processing of a {@link BalanceRequest}. The Solr layer can (and will) check that
+ * the {@link BalancePlan} conforms to the {@link BalanceRequest} (and if it does not, the requested
+ * operation will fail).
+ */
+public interface BalancePlanFactory {
+ /**
+ * Creates a {@link BalancePlan} for balancing replicas across the given nodes.
+ *
+ * <p>This is in support (directly or indirectly) of {@link
+ * org.apache.solr.cloud.api.collections.BalanceReplicasCmd}}.
+ */
+ BalancePlan createBalancePlan(BalanceRequest request, Map<Replica, Node> replicaMovements);
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java b/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java
new file mode 100644
index 00000000000..f967b7fa808
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/BalanceRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import java.util.Set;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+
+/**
+ * A cluster related placement request that Solr asks a {@link PlacementPlugin} to resolve and
+ * compute replica balancing plan for replicas that already exist across a set of Nodes.
+ *
+ * <p>The set of {@link Node}s on which the replicas should be balanced across is specified
+ * (defaults to being equal to the set returned by {@link Cluster#getLiveDataNodes()}).
+ */
+public interface BalanceRequest extends ModificationRequest {
+
+ /**
+ * Replicas should only be balanced on nodes in the set returned by this method.
+ *
+ * <p>When Collection API calls do not specify a specific set of nodes, replicas can be balanced
+ * on all live nodes in the cluster. In such cases, this set will be equal to the set of all live
+ * nodes. The plugin placement code does not need to worry (or care) if a set of nodes was
+ * explicitly specified or not.
+ *
+ * @return never {@code null} and never empty set (if that set was to be empty for any reason, no
+ * balance would be possible and the Solr infrastructure driving the plugin code would detect
+ * the error itself rather than calling the plugin).
+ */
+ Set<Node> getNodes();
+
+ int getMaximumBalanceSkew();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
index 4ad577ea0b5..be7b0282a79 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementContext.java
@@ -39,4 +39,7 @@ public interface PlacementContext {
/** Factory used to create instances of {@link PlacementPlan} to return computed decision. */
PlacementPlanFactory getPlacementPlanFactory();
+
+ /** Factory used to create instances of {@link BalancePlan} to return computed decision. */
+ BalancePlanFactory getBalancePlanFactory();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
index a2b7c3c01c8..0739996e118 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
@@ -74,6 +74,20 @@ public interface PlacementPlugin {
Collection<PlacementRequest> placementRequests, PlacementContext placementContext)
throws PlacementException, InterruptedException;
+ /**
+ * Request from plugin code to compute a balancing of replicas. Note this method must be reentrant
+ * as a plugin instance may (read will) get multiple such calls in parallel.
+ *
+ * <p>Configuration is passed upon creation of a new instance of this class by {@link
+ * PlacementPluginFactory#createPluginInstance}.
+ *
+ * @param balanceRequest request for selecting replicas that should be moved to aid in balancing
+ * the replicas across the desired nodes.
+ * @return plan satisfying all extraction requests.
+ */
+ BalancePlan computeBalancing(BalanceRequest balanceRequest, PlacementContext placementContext)
+ throws PlacementException, InterruptedException;
+
/**
* Verify that a collection layout modification doesn't violate constraints on replica placements
* required by this plugin. Default implementation is a no-op (any modifications are allowed).
@@ -85,5 +99,5 @@ public interface PlacementPlugin {
*/
default void verifyAllowedModification(
ModificationRequest modificationRequest, PlacementContext placementContext)
- throws PlacementModificationException, InterruptedException {}
+ throws PlacementException, InterruptedException {}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java
new file mode 100644
index 00000000000..05ba636e34d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanFactoryImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalancePlanFactory;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+/** Simple implementation of {@link BalancePlanFactory}. */
+public class BalancePlanFactoryImpl implements BalancePlanFactory {
+ @Override
+ public BalancePlan createBalancePlan(
+ BalanceRequest request, Map<Replica, Node> replicaMovements) {
+ return new BalancePlanImpl(request, replicaMovements);
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java
new file mode 100644
index 00000000000..7bf75a891dc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalancePlanImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Map;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+class BalancePlanImpl implements BalancePlan {
+
+ final BalanceRequest request;
+ final Map<Replica, Node> replicaMovements;
+
+ BalancePlanImpl(BalanceRequest request, Map<Replica, Node> replicaMovements) {
+ this.request = request;
+ this.replicaMovements = replicaMovements;
+ }
+
+ @Override
+ public BalanceRequest getRequest() {
+ return request;
+ }
+
+ @Override
+ public Map<Replica, Node> getReplicaMovements() {
+ return replicaMovements;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("BalancePlan{");
+ for (Map.Entry<Replica, Node> movement : replicaMovements.entrySet()) {
+ sb.append("\n")
+ .append(movement.getKey().getReplicaName())
+ .append(" : ")
+ .append(movement.getValue().getName());
+ }
+ sb.append("\n}");
+ return sb.toString();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java
new file mode 100644
index 00000000000..95454f9dbbe
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/BalanceRequestImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import java.util.Set;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalanceRequest;
+
+public class BalanceRequestImpl implements BalanceRequest {
+
+ private final SolrCollection solrCollection;
+ private final Set<Node> nodes;
+ private final int maximumBalanceSkew;
+
+ public BalanceRequestImpl(Set<Node> nodes) {
+ this(nodes, -1);
+ }
+
+ public BalanceRequestImpl(Set<Node> nodes, int maximumBalanceSkew) {
+ this.nodes = nodes;
+ this.maximumBalanceSkew = maximumBalanceSkew;
+ this.solrCollection = null;
+ }
+
+ @Override
+ public Set<Node> getNodes() {
+ return nodes;
+ }
+
+ @Override
+ public int getMaximumBalanceSkew() {
+ return maximumBalanceSkew;
+ }
+
+ @Override
+ public SolrCollection getCollection() {
+ return solrCollection;
+ }
+
+ /** Returns a {@link BalanceRequest} that can be consumed by a plugin to balance replicas */
+ static BalanceRequestImpl create(Cluster cluster, Set<String> nodeNames, int maximumBalanceSkew)
+ throws Assign.AssignmentException {
+ final Set<Node> nodes;
+ // If no nodes specified, use all live data nodes. If nodes are specified, use specified list.
+ if (nodeNames != null && !nodeNames.isEmpty()) {
+ if (nodeNames.size() == 1) {
+ throw new Assign.AssignmentException(
+ "Bad balance request: cannot balance across a single node: "
+ + nodeNames.stream().findAny().get());
+ }
+ nodes = SimpleClusterAbstractionsImpl.NodeImpl.getNodes(nodeNames);
+
+ for (Node n : nodes) {
+ if (!cluster.getLiveDataNodes().contains(n)) {
+ throw new Assign.AssignmentException(
+ "Bad balance request: specified node is a non-data hosting node:" + n.getName());
+ }
+ }
+ } else {
+ nodes = cluster.getLiveDataNodes();
+ if (nodes.isEmpty()) {
+ throw new Assign.AssignmentException("Impossible balance request: no live data nodes");
+ }
+ }
+
+ return new BalanceRequestImpl(nodes, maximumBalanceSkew);
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
index d7d94644461..5fab7b2fe9c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
@@ -18,13 +18,16 @@
package org.apache.solr.cluster.placement.impl;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.placement.BalanceRequest;
import org.apache.solr.cluster.placement.DeleteCollectionRequest;
import org.apache.solr.cluster.placement.DeleteReplicasRequest;
import org.apache.solr.cluster.placement.PlacementContext;
@@ -35,10 +38,16 @@ import org.apache.solr.cluster.placement.PlacementRequest;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.CollectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** This assign strategy delegates placement computation to "plugin" code. */
public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private final PlacementPlugin plugin;
public PlacementPluginAssignStrategy(PlacementPlugin plugin) {
@@ -88,6 +97,54 @@ public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
return replicaPositions;
}
+ @Override
+ public Map<Replica, String> computeReplicaBalancing(
+ SolrCloudManager solrCloudManager, Set<String> nodes, int maxBalanceSkew)
+ throws Assign.AssignmentException, IOException, InterruptedException {
+ PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager);
+
+ BalanceRequest balanceRequest =
+ BalanceRequestImpl.create(placementContext.getCluster(), nodes, maxBalanceSkew);
+ try {
+ Map<org.apache.solr.cluster.Replica, Node> rawReplicaMovements =
+ plugin.computeBalancing(balanceRequest, placementContext).getReplicaMovements();
+ Map<Replica, String> replicaMovements = CollectionUtil.newHashMap(rawReplicaMovements.size());
+ for (Map.Entry<org.apache.solr.cluster.Replica, Node> movement :
+ rawReplicaMovements.entrySet()) {
+ Replica converted = findReplica(solrCloudManager, movement.getKey());
+ if (converted == null) {
+ throw new Assign.AssignmentException(
+ "Could not find replica when balancing: " + movement.getKey().toString());
+ }
+ replicaMovements.put(converted, movement.getValue().getName());
+ }
+ return replicaMovements;
+ } catch (PlacementException pe) {
+ throw new Assign.AssignmentException(pe);
+ }
+ }
+
+ private Replica findReplica(
+ SolrCloudManager solrCloudManager, org.apache.solr.cluster.Replica replica) {
+ DocCollection collection = null;
+ try {
+ collection =
+ solrCloudManager
+ .getClusterState()
+ .getCollection(replica.getShard().getCollection().getName());
+ } catch (IOException e) {
+ throw new Assign.AssignmentException(
+ "Could not load cluster state when balancing replicas", e);
+ }
+ if (collection != null) {
+ Slice slice = collection.getSlice(replica.getShard().getShardName());
+ if (slice != null) {
+ return slice.getReplica(replica.getReplicaName());
+ }
+ }
+ return null;
+ }
+
@Override
public void verifyDeleteCollection(SolrCloudManager solrCloudManager, DocCollection collection)
throws Assign.AssignmentException, IOException, InterruptedException {
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
index 0e138568492..6c1cae4bc67 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimplePlacementContextImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.placement.AttributeFetcher;
+import org.apache.solr.cluster.placement.BalancePlanFactory;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementPlanFactory;
@@ -32,6 +33,7 @@ public class SimplePlacementContextImpl implements PlacementContext {
private final Cluster cluster;
private final AttributeFetcher attributeFetcher;
private final PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
+ private final BalancePlanFactory balancePlanFactory = new BalancePlanFactoryImpl();
public SimplePlacementContextImpl(SolrCloudManager solrCloudManager) throws IOException {
cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager);
@@ -52,4 +54,9 @@ public class SimplePlacementContextImpl implements PlacementContext {
public PlacementPlanFactory getPlacementPlanFactory() {
return placementPlanFactory;
}
+
+ @Override
+ public BalancePlanFactory getBalancePlanFactory() {
+ return balancePlanFactory;
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
index 076fa4a02e6..80d2d43af96 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -19,49 +19,37 @@ package org.apache.solr.cluster.placement.plugins;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Random;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.AttributeFetcher;
import org.apache.solr.cluster.placement.AttributeValues;
+import org.apache.solr.cluster.placement.BalanceRequest;
import org.apache.solr.cluster.placement.DeleteCollectionRequest;
-import org.apache.solr.cluster.placement.DeleteReplicasRequest;
-import org.apache.solr.cluster.placement.DeleteShardsRequest;
-import org.apache.solr.cluster.placement.ModificationRequest;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementException;
import org.apache.solr.cluster.placement.PlacementModificationException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.ReplicaMetric;
+import org.apache.solr.cluster.placement.ShardMetrics;
import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
+import org.apache.solr.cluster.placement.impl.ReplicaMetricImpl;
+import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.SuppressForbidden;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,42 +97,10 @@ import org.slf4j.LoggerFactory;
* prop), and avoid having more than one replica per shard on the same node.<br>
* Only after these constraints are satisfied do minimize cores per node or disk usage.</i>
*
- * <p>Overall strategy of this plugin:
- *
- * <ul>
- * <li>The set of nodes in the cluster is obtained and transformed into 3 independent sets (that
- * can overlap) of nodes accepting each of the three replica types.
- * <li>For each shard on which placing replicas is required and then for each replica type to
- * place (starting with NRT, then TLOG then PULL):
- * <ul>
- * <li>The set of candidates nodes corresponding to the replica type is used and from that
- * set are removed nodes that already have a replica (of any type) for that shard
- * <li>If there are not enough nodes, an error is thrown (this is checked further down
- * during processing).
- * <li>The number of (already existing) replicas of the current type on each Availability
- * Zone is collected.
- * <li>Separate the set of available nodes to as many subsets (possibly some are empty) as
- * there are Availability Zones defined for the candidate nodes
- * <li>In each AZ nodes subset, sort the nodes by increasing total number of cores count,
- * with possibly a condition that pushes nodes with low disk space to the end of the
- * list? Or a weighted combination of the relative importance of these two factors? Some
- * randomization? Marking as non available nodes with not enough disk space? These and
- * other are likely aspects to be played with once the plugin is tested or observed to
- * be running in prod, don't expect the initial code drop(s) to do all of that.
- * <li>Iterate over the number of replicas to place (for the current replica type for the
- * current shard):
- * <ul>
- * <li>Based on the number of replicas per AZ collected previously, pick the non empty
- * set of nodes having the lowest number of replicas. Then pick the first node in
- * that set. That's the node the replica is placed one. Remove the node from the
- * set of available nodes for the given AZ and increase the number of replicas
- * placed on that AZ.
- * </ul>
- * <li>During this process, the number of cores on the nodes in general is tracked to take
- * into account placement decisions so that not all shards decide to put their replicas
- * on the same nodes (they might though if these are the less loaded nodes).
- * </ul>
- * </ul>
+ * <p>This plugin achieves this by creating a {@link AffinityPlacementPlugin.AffinityNode} that
+ * weights nodes very high if they are unbalanced with respect to AvailabilityZone and SpreadDomain.
+ * See {@link AffinityPlacementPlugin.AffinityNode} for more information on how this weighting helps
+ * the plugin correctly place and balance replicas.
*
* <p>This code is a realistic placement computation, based on a few assumptions. The code is
* written in such a way to make it relatively easy to adapt it to (somewhat) different assumptions.
@@ -190,7 +146,7 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
* See {@link AffinityPlacementFactory} for instructions on how to configure a cluster to use this
* plugin and details on what the plugin does.
*/
- static class AffinityPlacementPlugin implements PlacementPlugin {
+ static class AffinityPlacementPlugin extends OrderedNodePlacementPlugin {
private final long minimalFreeDiskGB;
@@ -199,13 +155,10 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
// primary to secondary (1:1)
private final Map<String, String> withCollections;
// secondary to primary (1:N)
- private final Map<String, Set<String>> colocatedWith;
+ private final Map<String, Set<String>> collocatedWith;
private final Map<String, Set<String>> nodeTypes;
- private final Random replicaPlacementRandom =
- new Random(); // ok even if random sequence is predictable.
-
private final boolean spreadAcrossDomains;
/**
@@ -225,12 +178,12 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
this.spreadAcrossDomains = spreadAcrossDomains;
this.withCollections = withCollections;
if (withCollections.isEmpty()) {
- colocatedWith = Map.of();
+ collocatedWith = Map.of();
} else {
- colocatedWith = new HashMap<>();
+ collocatedWith = new HashMap<>();
withCollections.forEach(
(primary, secondary) ->
- colocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary));
+ collocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary));
}
if (collectionNodeTypes.isEmpty()) {
@@ -245,175 +198,21 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
}
});
}
-
- // We make things reproducible in tests by using test seed if any
- String seed = System.getProperty("tests.seed");
- if (seed != null) {
- replicaPlacementRandom.setSeed(seed.hashCode());
- }
- }
-
- @Override
- @SuppressForbidden(
- reason =
- "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- public List<PlacementPlan> computePlacements(
- Collection<PlacementRequest> requests, PlacementContext placementContext)
- throws PlacementException {
- List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
- Set<Node> allNodes = new HashSet<>();
- for (PlacementRequest request : requests) {
- allNodes.addAll(request.getTargetNodes());
- }
-
- // Fetch attributes for a superset of all nodes requested amongst the placementRequests
- AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
- attributeFetcher
- .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
- .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
- .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
- .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
- attributeFetcher
- .requestNodeMetric(NodeMetricImpl.NUM_CORES)
- .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
- attributeFetcher.fetchFrom(allNodes);
- final AttributeValues attrValues = attributeFetcher.fetchAttributes();
- // Get the number of currently existing cores per node, so we can update as we place new cores
- // to not end up always selecting the same node(s). This is used across placement requests
- Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, attrValues);
-
- boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, attrValues);
-
- // Keep track with nodesWithReplicas across requests
- Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new HashMap<>();
- for (PlacementRequest request : requests) {
- Set<Node> nodes = request.getTargetNodes();
- SolrCollection solrCollection = request.getCollection();
-
- // filter out nodes that don't meet the `withCollection` constraint
- nodes =
- filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes);
- // filter out nodes that don't match the "node types" specified in the collection props
- nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes);
-
- // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap
- // if nodes accept multiple replica types). These subsets sets are actually maps, because we
- // capture the number of cores (of any replica type) present on each node.
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
- getAvailableNodesForReplicaTypes(nodes, attrValues);
-
- // All available zones of live nodes. Due to some nodes not being candidates for placement,
- // and some existing replicas being one availability zones that might be offline (i.e. their
- // nodes are not live), this set might contain zones on which it is impossible to place
- // replicas. That's ok.
- Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
-
- // Build the replica placement decisions here
- Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
-
- // Let's now iterate on all shards to create replicas for and start finding home sweet homes
- // for the replicas
- for (String shardName : request.getShardNames()) {
- // Inventory nodes (if any) that already have a replica of any type for the shard, because
- // we can't be placing additional replicas on these. This data structure is updated after
- // each replica to node assign and is used to make sure different replica types are not
- // allocated to the same nodes (protecting same node assignments within a given replica
- // type is done "by construction" in makePlacementDecisions()).
- Set<Node> nodesWithReplicas =
- allNodesWithReplicas
- .computeIfAbsent(solrCollection.getName(), col -> new HashMap<>())
- .computeIfAbsent(
- shardName,
- s -> {
- Set<Node> newNodeSet = new HashSet<>();
- Shard shard = solrCollection.getShard(s);
- if (shard != null) {
- // Prefill the set with the existing replicas
- for (Replica r : shard.replicas()) {
- newNodeSet.add(r.getNode());
- }
- }
- return newNodeSet;
- });
-
- // Iterate on the replica types in the enum order. We place more strategic replicas first
- // (NRT is more strategic than TLOG more strategic than PULL). This is in case we
- // eventually decide that less strategic replica placement impossibility is not a problem
- // that should lead to replica placement computation failure. Current code does fail if
- // placement is impossible (constraint is at most one replica of a shard on any node).
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- makePlacementDecisions(
- solrCollection,
- shardName,
- availabilityZones,
- replicaType,
- request.getCountReplicasToCreate(replicaType),
- attrValues,
- replicaTypeToNodes,
- nodesWithReplicas,
- allCoresOnNodes,
- placementContext.getPlacementPlanFactory(),
- replicaPlacements,
- doSpreadAcrossDomains);
- }
- }
- placementPlans.add(
- placementContext
- .getPlacementPlanFactory()
- .createPlacementPlan(request, replicaPlacements));
- }
-
- return placementPlans;
- }
-
- private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, AttributeValues attrValues) {
- boolean doSpreadAcrossDomains =
- spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
- if (spreadAcrossDomains && !doSpreadAcrossDomains) {
- log.warn(
- "AffinityPlacementPlugin configured to spread across domains, but there are nodes in the cluster without the {} system property. Ignoring spreadAcrossDomains.",
- AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
- }
- return doSpreadAcrossDomains;
- }
-
- private boolean spreadDomainPropPresent(Set<Node> allNodes, AttributeValues attrValues) {
- // We can only use spread domains if all nodes have the system property
- return allNodes.stream()
- .noneMatch(
- n ->
- attrValues
- .getSystemProperty(n, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .isEmpty());
}
@Override
- public void verifyAllowedModification(
- ModificationRequest modificationRequest, PlacementContext placementContext)
- throws PlacementModificationException, InterruptedException {
- if (modificationRequest instanceof DeleteShardsRequest) {
- log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
- } else if (modificationRequest instanceof DeleteCollectionRequest) {
- verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
- } else if (modificationRequest instanceof DeleteReplicasRequest) {
- verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
- } else {
- log.warn("unsupported request type, skipping: {}", modificationRequest);
- }
- }
-
- private void verifyDeleteCollection(
+ protected void verifyDeleteCollection(
DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext)
- throws PlacementModificationException, InterruptedException {
+ throws PlacementModificationException {
Cluster cluster = placementContext.getCluster();
- Set<String> colocatedCollections =
- colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of());
- for (String primaryName : colocatedCollections) {
+ Set<String> collocatedCollections =
+ collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of());
+ for (String primaryName : collocatedCollections) {
try {
if (cluster.getCollection(primaryName) != null) {
// still exists
throw new PlacementModificationException(
- "colocated collection "
+ "collocated collection "
+ primaryName
+ " of "
+ deleteCollectionRequest.getCollection().getName()
@@ -421,788 +220,577 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
}
} catch (IOException e) {
throw new PlacementModificationException(
- "failed to retrieve colocated collection information", e);
+ "failed to retrieve collocated collection information", e);
}
}
}
- private void verifyDeleteReplicas(
- DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext)
- throws PlacementModificationException, InterruptedException {
- Cluster cluster = placementContext.getCluster();
- SolrCollection secondaryCollection = deleteReplicasRequest.getCollection();
- Set<String> colocatedCollections = colocatedWith.get(secondaryCollection.getName());
- if (colocatedCollections == null) {
- return;
- }
- Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>();
- secondaryCollection
- .shards()
- .forEach(
- shard ->
- shard
- .replicas()
- .forEach(
- replica -> {
- secondaryNodeShardReplicas
- .computeIfAbsent(replica.getNode(), n -> new HashMap<>())
- .computeIfAbsent(
- replica.getShard().getShardName(), s -> new AtomicInteger())
- .incrementAndGet();
- }));
-
- // find the colocated-with collections
- Map<Node, Set<String>> colocatingNodes = new HashMap<>();
- try {
- for (String colocatedCollection : colocatedCollections) {
- SolrCollection coll = cluster.getCollection(colocatedCollection);
- coll.shards()
- .forEach(
- shard ->
- shard
- .replicas()
- .forEach(
- replica -> {
- colocatingNodes
- .computeIfAbsent(replica.getNode(), n -> new HashSet<>())
- .add(coll.getName());
- }));
- }
- } catch (IOException ioe) {
- throw new PlacementModificationException(
- "failed to retrieve colocated collection information", ioe);
- }
- PlacementModificationException exception = null;
- for (Replica replica : deleteReplicasRequest.getReplicas()) {
- if (!colocatingNodes.containsKey(replica.getNode())) {
- continue;
- }
- // check that there will be at least one replica remaining
- AtomicInteger secondaryCount =
- secondaryNodeShardReplicas
- .getOrDefault(replica.getNode(), Map.of())
- .getOrDefault(replica.getShard().getShardName(), new AtomicInteger());
- if (secondaryCount.get() > 1) {
- // we can delete it - record the deletion
- secondaryCount.decrementAndGet();
- continue;
- }
- // fail - this replica cannot be removed
- if (exception == null) {
- exception = new PlacementModificationException("delete replica(s) rejected");
- }
- exception.addRejectedModification(
- replica.toString(),
- "co-located with replicas of " + colocatingNodes.get(replica.getNode()));
- }
- if (exception != null) {
- throw exception;
- }
- }
-
- private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
- Set<String> azs = new HashSet<>();
-
- for (Node n : nodes) {
- azs.add(getNodeAZ(n, attrValues));
- }
-
- return Collections.unmodifiableSet(azs);
- }
-
/**
- * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property
- * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then return {@link
- * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
+ * AffinityPlacementContext is used to share information across {@link AffinityNode} instances.
+ *
+ * <p>For instance, with SpreadDomains and AvailabilityZones, the weighting of a Node requires
+ * information on the contents of other Nodes. This class is how that information is shared.
+ *
+ * <p>One AffinityPlacementContext is used for each call to {@link
+ * #computePlacements(Collection, PlacementContext)} or {@link #computeBalancing(BalanceRequest,
+ * PlacementContext)}. The state of the context will be altered throughout the computation.
*/
- private String getNodeAZ(Node n, final AttributeValues attrValues) {
- Optional<String> nodeAz =
- attrValues.getSystemProperty(n, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP);
- // All nodes with undefined AZ will be considered part of the same AZ. This also works for
- // deployments that do not care about AZ's
- return nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+ private static final class AffinityPlacementContext {
+ private final Set<String> allSpreadDomains = new HashSet<>();
+ private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage = new HashMap<>();
+ private final Set<String> allAvailabilityZones = new HashSet<>();
+ private final Map<String, Map<String, Map<Replica.ReplicaType, ReplicaSpread>>>
+ availabilityZoneUsage = new HashMap<>();
+ private boolean doSpreadAcrossDomains;
}
- /**
- * This class captures an availability zone and the nodes that are legitimate targets for
- * replica placement in that Availability Zone. Instances are used as values in a {@link
- * java.util.TreeMap} in which the total number of already existing replicas in the AZ is the
- * key. This allows easily picking the set of nodes from which to select a node for placement in
- * order to balance the number of replicas per AZ. Picking one of the nodes from the set is done
- * using different criteria unrelated to the Availability Zone (picking the node is based on the
- * {@link CoresAndDiskComparator} ordering).
- */
- private static class AzWithNodes {
- final String azName;
- private final boolean useSpreadDomains;
- private boolean listIsSorted = false;
- private final Comparator<Node> nodeComparator;
- private final Random random;
- private final List<Node> availableNodesForPlacement;
- private final AttributeValues attributeValues;
- private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
- private final Map<String, Integer> currentSpreadDomainUsageUsage;
- private int numNodesForPlacement;
-
- AzWithNodes(
- String azName,
- List<Node> availableNodesForPlacement,
- boolean useSpreadDomains,
- Comparator<Node> nodeComparator,
- Random random,
- AttributeValues attributeValues,
- Map<String, Integer> currentSpreadDomainUsageUsage) {
- this.azName = azName;
- this.availableNodesForPlacement = availableNodesForPlacement;
- this.useSpreadDomains = useSpreadDomains;
- this.nodeComparator = nodeComparator;
- this.random = random;
- this.attributeValues = attributeValues;
- this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
- this.numNodesForPlacement = availableNodesForPlacement.size();
+ @Override
+ protected Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors)
+ throws PlacementException {
+ // Fetch attributes for a superset of all nodes requested amongst the placementRequests
+ AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
+ attributeFetcher
+ .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+ .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
+ .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+ .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+ attributeFetcher
+ .requestNodeMetric(NodeMetricImpl.NUM_CORES)
+ .requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
+ Set<ReplicaMetric<?>> replicaMetrics = Set.of(ReplicaMetricImpl.INDEX_SIZE_GB);
+ Set<String> requestedCollections = new HashSet<>();
+ for (SolrCollection collection : relevantCollections) {
+ if (requestedCollections.add(collection.getName())) {
+ attributeFetcher.requestCollectionMetrics(collection, replicaMetrics);
+ }
}
+ attributeFetcher.fetchFrom(nodes);
+ final AttributeValues attrValues = attributeFetcher.fetchAttributes();
- private boolean hasBeenSorted() {
- return (useSpreadDomains && sortedSpreadDomains != null)
- || (!useSpreadDomains && listIsSorted);
- }
+ AffinityPlacementContext affinityPlacementContext = new AffinityPlacementContext();
+ affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains;
- void ensureSorted() {
- if (!hasBeenSorted()) {
- sort();
+ Map<Node, WeightedNode> affinityNodeMap = CollectionUtil.newHashMap(nodes.size());
+ for (Node node : nodes) {
+ AffinityNode affinityNode =
+ newNodeFromMetrics(node, attrValues, affinityPlacementContext, skipNodesWithErrors);
+ if (affinityNode != null) {
+ affinityNodeMap.put(node, affinityNode);
}
}
- private void sort() {
- assert !listIsSorted && sortedSpreadDomains == null
- : "We shouldn't be sorting this list again";
+ // If there are not multiple spreadDomains, then there is nothing to spread across
+ if (affinityPlacementContext.allSpreadDomains.size() < 2) {
+ affinityPlacementContext.doSpreadAcrossDomains = false;
+ }
- // Make sure we do not tend to use always the same nodes (within an AZ) if all
- // conditions are identical (well, this likely is not the case since after having added
- // a replica to a node its number of cores increases for the next placement decision,
- // but let's be defensive here, given that multiple concurrent placement decisions might
- // see the same initial cluster state, and we want placement to be reasonable even in
- // that case without creating an unnecessary imbalance). For example, if all nodes have
- // 0 cores and same amount of free disk space, ideally we want to pick a random node for
- // placement, not always the same one due to some internal ordering.
- Collections.shuffle(availableNodesForPlacement, random);
+ return affinityNodeMap;
+ }
- if (useSpreadDomains) {
- // When we use spread domains, we don't just sort the list of nodes, instead we generate a
- // TreeSet of SpreadDomainWithNodes,
- // sorted by the number of times the domain has been used. Each
- // SpreadDomainWithNodes internally contains the list of nodes that belong to that
- // particular domain,
- // and it's sorted internally by the comparator passed to this
- // class (which is the same that's used when not using spread domains).
- // Whenever a node from a particular SpreadDomainWithNodes is selected as the best
- // candidate, the call to "removeBestNode" will:
- // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
- // 2. Remove the best node from the list within the SpreadDomainWithNodes
- // 3. Increment the count of times the domain has been used
- // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if there are still nodes
- // available
- HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new HashMap<>();
- for (Node node : availableNodesForPlacement) {
- spreadDomainToListOfNodesMap
- .computeIfAbsent(
- attributeValues
- .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .get(),
- k -> new ArrayList<>())
- .add(node);
- }
- sortedSpreadDomains =
- new TreeSet<>(new SpreadDomainComparator(currentSpreadDomainUsageUsage));
+ AffinityNode newNodeFromMetrics(
+ Node node,
+ AttributeValues attrValues,
+ AffinityPlacementContext affinityPlacementContext,
+ boolean skipNodesWithErrors)
+ throws PlacementException {
+ Set<Replica.ReplicaType> supportedReplicaTypes =
+ attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream()
+ .flatMap(s -> Arrays.stream(s.split(",")))
+ .map(String::trim)
+ .map(s -> s.toUpperCase(Locale.ROOT))
+ .map(
+ s -> {
+ try {
+ return Replica.ReplicaType.valueOf(s);
+ } catch (IllegalArgumentException e) {
+ log.warn(
+ "Node {} has an invalid value for the {} systemProperty: {}",
+ node.getName(),
+ AffinityPlacementConfig.REPLICA_TYPE_SYSPROP,
+ s);
+ return null;
+ }
+ })
+ .collect(Collectors.toSet());
+ if (supportedReplicaTypes.isEmpty()) {
+ // If property not defined or is only whitespace on a node, assuming node can take any
+ // replica type
+ supportedReplicaTypes = Set.of(Replica.ReplicaType.values());
+ }
- int i = 0;
- for (Map.Entry<String, List<Node>> entry : spreadDomainToListOfNodesMap.entrySet()) {
- // Sort the nodes within the spread domain by the provided comparator
- entry.getValue().sort(nodeComparator);
- sortedSpreadDomains.add(
- new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), i++, nodeComparator));
+ Set<String> nodeType;
+ Optional<String> nodePropOpt =
+ attrValues.getSystemProperty(node, AffinityPlacementConfig.NODE_TYPE_SYSPROP);
+ if (nodePropOpt.isEmpty()) {
+ nodeType = Collections.emptySet();
+ } else {
+ nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
+ }
+
+ Optional<Double> nodeFreeDiskGB = attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB);
+ Optional<Integer> nodeNumCores = attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES);
+ String az =
+ attrValues
+ .getSystemProperty(node, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+ .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+ affinityPlacementContext.allAvailabilityZones.add(az);
+ String spreadDomain;
+ if (affinityPlacementContext.doSpreadAcrossDomains) {
+ spreadDomain =
+ attrValues
+ .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+ .orElse(null);
+ if (spreadDomain == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "AffinityPlacementPlugin configured to spread across domains, but node {} does not have the {} system property. Ignoring spreadAcrossDomains.",
+ node.getName(),
+ AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
}
+ // In the context stop using spreadDomains, because we have a node without a spread
+ // domain.
+ affinityPlacementContext.doSpreadAcrossDomains = false;
+ affinityPlacementContext.allSpreadDomains.clear();
} else {
- availableNodesForPlacement.sort(nodeComparator);
- listIsSorted = true;
+ affinityPlacementContext.allSpreadDomains.add(spreadDomain);
}
+ } else {
+ spreadDomain = null;
}
-
- Node getBestNode() {
- assert hasBeenSorted();
- if (useSpreadDomains) {
- return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
- } else {
- return availableNodesForPlacement.get(0);
+ if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "Unknown free disk on node {}, excluding it from placement decisions.",
+ node.getName());
}
- }
-
- public Node removeBestNode() {
- assert hasBeenSorted();
- this.numNodesForPlacement--;
- if (useSpreadDomains) {
- // Since this SpreadDomainWithNodes needs to be re-sorted in the sortedSpreadDomains, we
- // remove it and then re-add it, once the best node has been removed.
- SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
- Node n = group.sortedNodesForPlacement.remove(0);
- this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, Integer::sum);
- if (!group.sortedNodesForPlacement.isEmpty()) {
- sortedSpreadDomains.add(group);
- }
- return n;
- } else {
- return availableNodesForPlacement.remove(0);
+ return null;
+ } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) {
+ if (log.isWarnEnabled()) {
+ log.warn(
+ "Unknown number of cores on node {}, excluding it from placement decisions.",
+ node.getName());
}
- }
-
- public int numNodes() {
- return this.numNodesForPlacement;
+ return null;
+ } else {
+ return new AffinityNode(
+ node,
+ attrValues,
+ affinityPlacementContext,
+ supportedReplicaTypes,
+ nodeType,
+ nodeNumCores.orElse(0),
+ nodeFreeDiskGB.orElse(0D),
+ az,
+ spreadDomain);
}
}
/**
- * This class represents group of nodes with the same {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
+ * This implementation weights nodes in order to achieve balancing across AvailabilityZones and
+ * SpreadDomains, while trying to minimize the amount of replicas on a node and ensure a given
+ * disk space per node. This implementation also supports limiting the placement of certain
+ * replica types per node and co-locating collections.
+ *
+ * <p>The total weight of the AffinityNode is the sum of:
+ *
+ * <ul>
+ * <li>The number of replicas on the node
+ * <li>100 if the free disk space on the node < prioritizedFreeDiskGB, otherwise 0
+ * <li>If SpreadDomains are used:<br>
+ * 10,000 * the sum over each collection/shard:
+ * <ul>
+ * <li>(# of replicas in this node's spread domain - the minimum spreadDomain's
+ * replicaCount)^2 <br>
+ * <i>These are individually squared to penalize higher values when summing up all
+ * values</i>
+ * </ul>
+ * <li>If AvailabilityZones are used:<br>
+ * 1,000,000 * the sum over each collection/shard/replicaType:
+ * <ul>
+ * <li>(# of replicas in this node's AZ - the minimum AZ's replicaCount)^2 <br>
+ * <i>These are individually squared to penalize higher values when summing up all
+ * values</i>
+ * </ul>
+ * </ul>
+ *
+ * The weighting here ensures that the order of importance for nodes is:
+ *
+ * <ol>
+ * <li>Spread replicas of the same shard/replicaType across availabilityZones
+ * <li>Spread replicas of the same shard across spreadDomains
+ * <li>Make sure that replicas are not placed on nodes that have < prioritizedFreeDiskGB disk
+ * space available
+ * <li>Minimize the amount of replicas on the node
+ * </ol>
+ *
+ * <p>The "relevant" weight with a replica is the sum of:
+ *
+ * <ul>
+ * <li>The number of replicas on the node
+ * <li>100 if the projected free disk space on the node < prioritizedFreeDiskGB, otherwise 0
+ * <li>If SpreadDomains are used:<br>
+ * 10,000 * ( # of replicas for the replica's shard in this node's spread domain - the
+ * minimum spreadDomain's replicaCount )
+ * <li>If AvailabilityZones are used:<br>
+ * 1,000,000 * ( # of replicas for the replica's shard & replicaType in this node's AZ -
+ * the minimum AZ's replicaCount )
+ * </ul>
+ *
+ * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+ *
+ * <p>Users can specify withCollection, to ensure that co-placement of replicas is ensured when
+ * computing new replica placements or replica balancing.
*/
- static class SpreadDomainWithNodes implements Comparable<SpreadDomainWithNodes> {
+ private class AffinityNode extends WeightedNode {
- /**
- * This is the label that all nodes in this group have in {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
- */
- final String spreadDomainName;
+ private final AttributeValues attrValues;
- /**
- * The list of all nodes that contain the same {@link
- * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be sorted before creating
- * this class.
- */
- private final List<Node> sortedNodesForPlacement;
+ private final AffinityPlacementContext affinityPlacementContext;
- /**
- * This is used for tie breaking the sort of {@link SpreadDomainWithNodes}, when the
- * nodeComparator between the top nodes of each group return 0.
- */
- private final int tieBreaker;
+ private final Set<Replica.ReplicaType> supportedReplicaTypes;
+ private final Set<String> nodeType;
- /**
- * This is the comparator that is used to compare the top nodes in the {@link
- * #sortedNodesForPlacement} lists. Must be the same that was used to sort {@link
- * #sortedNodesForPlacement}.
- */
- private final Comparator<Node> nodeComparator;
+ private int coresOnNode;
+ private double nodeFreeDiskGB;
- public SpreadDomainWithNodes(
- String spreadDomainName,
- List<Node> sortedNodesForPlacement,
- int tieBreaker,
- Comparator<Node> nodeComparator) {
- this.spreadDomainName = spreadDomainName;
- this.sortedNodesForPlacement = sortedNodesForPlacement;
- this.tieBreaker = tieBreaker;
- this.nodeComparator = nodeComparator;
+ private final String availabilityZone;
+ private final String spreadDomain;
+
+ AffinityNode(
+ Node node,
+ AttributeValues attrValues,
+ AffinityPlacementContext affinityPlacementContext,
+ Set<Replica.ReplicaType> supportedReplicaTypes,
+ Set<String> nodeType,
+ int coresOnNode,
+ double nodeFreeDiskGB,
+ String az,
+ String spreadDomain) {
+ super(node);
+ this.attrValues = attrValues;
+ this.affinityPlacementContext = affinityPlacementContext;
+ this.supportedReplicaTypes = supportedReplicaTypes;
+ this.nodeType = nodeType;
+ this.coresOnNode = coresOnNode;
+ this.nodeFreeDiskGB = nodeFreeDiskGB;
+ this.availabilityZone = az;
+ this.spreadDomain = spreadDomain;
}
@Override
- public int compareTo(SpreadDomainWithNodes o) {
- if (o == this) {
- return 0;
- }
- int result =
- nodeComparator.compare(
- this.sortedNodesForPlacement.get(0), o.sortedNodesForPlacement.get(0));
- if (result == 0) {
- return Integer.compare(this.tieBreaker, o.tieBreaker);
- }
- return result;
+ public int calcWeight() {
+ return coresOnNode
+ // Only add 100 if prioritizedFreeDiskGB was provided and the node's freeDisk is lower
+ // than it
+ + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB < prioritizedFreeDiskGB ? 1 : 0)
+ + 10000 * getSpreadDomainWeight()
+ + 1000000 * getAZWeight();
}
- }
- /**
- * Builds the number of existing cores on each node returned in the attrValues. Nodes for which
- * the number of cores is not available for whatever reason are excluded from acceptable
- * candidate nodes as it would not be possible to make any meaningful placement decisions.
- *
- * @param nodes all nodes on which this plugin should compute placement
- * @param attrValues attributes fetched for the nodes. This method uses system property {@link
- * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each
- * node.
- */
- private Map<Node, Integer> getCoreCountPerNode(
- Set<Node> nodes, final AttributeValues attrValues) {
- Map<Node, Integer> coresOnNodes = new HashMap<>();
-
- for (Node node : nodes) {
- attrValues
- .getNodeMetric(node, NodeMetricImpl.NUM_CORES)
- .ifPresent(count -> coresOnNodes.put(node, count));
+ @Override
+ public int calcRelevantWeightWithReplica(Replica replica) {
+ return coresOnNode
+ // Only add 100 if prioritizedFreeDiskGB was provided and the node's projected freeDisk
+ // is lower than it
+ + 100
+ * (prioritizedFreeDiskGB > 0
+ && nodeFreeDiskGB - getProjectedSizeOfReplica(replica)
+ < prioritizedFreeDiskGB
+ ? 1
+ : 0)
+ + 10000 * projectReplicaSpreadWeight(replica)
+ + 1000000 * projectAZWeight(replica);
}
- return coresOnNodes;
- }
-
- /**
- * Given the set of all nodes on which to do placement and fetched attributes, builds the sets
- * representing candidate nodes for placement of replicas of each replica type. These sets are
- * packaged and returned in an EnumMap keyed by replica type. Nodes for which the number of
- * cores is not available for whatever reason are excluded from acceptable candidate nodes as it
- * would not be possible to make any meaningful placement decisions.
- *
- * @param nodes all nodes on which this plugin should compute placement
- * @param attrValues attributes fetched for the nodes. This method uses system property {@link
- * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each
- * node.
- */
- private EnumMap<Replica.ReplicaType, Set<Node>> getAvailableNodesForReplicaTypes(
- Set<Node> nodes, final AttributeValues attrValues) {
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
- new EnumMap<>(Replica.ReplicaType.class);
-
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- replicaTypeToNodes.put(replicaType, new HashSet<>());
+ @Override
+ public boolean canAddReplica(Replica replica) {
+ String collection = replica.getShard().getCollection().getName();
+ // By default, do not allow two replicas of the same shard on a node
+ return super.canAddReplica(replica)
+ // Filter out unsupported replica types
+ && supportedReplicaTypes.contains(replica.getType())
+ // Filter out unsupported node types
+ && Optional.ofNullable(nodeTypes.get(collection))
+ .map(s -> s.stream().anyMatch(nodeType::contains))
+ .orElse(true)
+ // Ensure any co-located collections already exist on the Node
+ && Optional.ofNullable(withCollections.get(collection))
+ .map(this::hasCollectionOnNode)
+ .orElse(true)
+ // Ensure the disk space will not go below the minimum if the replica is added
+ && (minimalFreeDiskGB <= 0
+ || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB);
}
- for (Node node : nodes) {
- // Exclude nodes with unknown or too small disk free space
- if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Unknown free disk on node {}, excluding it from placement decisions.",
- node.getName());
- }
- // We rely later on the fact that the free disk optional is present (see
- // CoresAndDiskComparator), be careful it you change anything here.
- continue;
- }
- if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get() < minimalFreeDiskGB) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.",
- node.getName(),
- attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get(),
- minimalFreeDiskGB);
- }
- continue;
- }
-
- if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn(
- "Unknown number of cores on node {}, excluding it from placement decisions.",
- node.getName());
+ /**
+ * Return any replicas that cannot be removed because there are collocated collections that
+ * require the replica to exist.
+ *
+ * @param replicas the replicas to remove
+ * @return any errors for replicas that cannot be removed
+ */
+ @Override
+ public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
+ Map<Replica, String> replicaRemovalExceptions = new HashMap<>();
+ Map<String, Map<String, Set<Replica>>> removals = new HashMap<>();
+ for (Replica replica : replicas) {
+ SolrCollection collection = replica.getShard().getCollection();
+ Set<String> collocatedCollections = new HashSet<>();
+ Optional.ofNullable(collocatedWith.get(collection.getName()))
+ .ifPresent(collocatedCollections::addAll);
+ collocatedCollections.retainAll(getCollectionsOnNode());
+ if (collocatedCollections.isEmpty()) {
+ continue;
}
- // We rely later on the fact that the number of cores optional is present (see
- // CoresAndDiskComparator), be careful it you change anything here.
- continue;
- }
- String supportedReplicaTypes =
- attrValues
- .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
- .isPresent()
- ? attrValues
- .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
- .get()
- : null;
- // If property not defined or is only whitespace on a node, assuming node can take any
- // replica type
- if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- replicaTypeToNodes.get(rt).add(node);
- }
- } else {
- Set<String> acceptedTypes =
- Arrays.stream(supportedReplicaTypes.split(","))
- .map(String::trim)
- .map(s -> s.toLowerCase(Locale.ROOT))
- .collect(Collectors.toSet());
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
- replicaTypeToNodes.get(rt).add(node);
- }
+ // There are collocatedCollections for this shard, so make sure there is a replica of this
+ // shard left on the node after it is removed
+ Set<Replica> replicasRemovedForShard =
+ removals
+ .computeIfAbsent(
+ replica.getShard().getCollection().getName(), k -> new HashMap<>())
+ .computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>());
+ replicasRemovedForShard.add(replica);
+
+ if (replicasRemovedForShard.size()
+ >= getReplicasForShardOnNode(replica.getShard()).size()) {
+ replicaRemovalExceptions.put(
+ replica, "co-located with replicas of " + collocatedCollections);
}
}
+ return replicaRemovalExceptions;
}
- return replicaTypeToNodes;
- }
- /**
- * Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas.
- *
- * <p>The criteria used in this method are, in this order:
- *
- * <ol>
- * <li>No more than one replica of a given shard on a given node (strictly enforced)
- * <li>Balance as much as possible replicas of a given {@link
- * org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. This balancing takes
- * into account existing replicas <b>of the corresponding replica type</b>, if any.
- * <li>Place replicas if possible on nodes having more than a certain amount of free disk
- * space (note that nodes with a too small amount of free disk space were eliminated as
- * placement targets earlier, in {@link #getAvailableNodesForReplicaTypes(Set,
- * AttributeValues)}). There's a threshold here rather than sorting on the amount of free
- * disk space, because sorting on that value would in practice lead to never considering
- * the number of cores on a node.
- * <li>Place replicas on nodes having a smaller number of cores (the number of cores
- * considered for this decision includes previous placement decisions made during the
- * processing of the placement request)
- * </ol>
- */
- @SuppressForbidden(
- reason =
- "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- private void makePlacementDecisions(
- SolrCollection solrCollection,
- String shardName,
- Set<String> availabilityZones,
- Replica.ReplicaType replicaType,
- int numReplicas,
- final AttributeValues attrValues,
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes,
- Set<Node> nodesWithReplicas,
- Map<Node, Integer> coresOnNodes,
- PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements,
- boolean doSpreadAcrossDomains)
- throws PlacementException {
- // Count existing replicas per AZ. We count only instances of the type of replica for which we
- // need to do placement. If we ever want to balance replicas of any type across AZ's (and not
- // each replica type balanced independently), we'd have to move this data structure to the
- // caller of this method so it can be reused across different replica type placements for a
- // given shard. Note then that this change would be risky. For example all NRT's and PULL
- // replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up
- // in the same AZ...
- Map<String, Integer> azToNumReplicas = new HashMap<>();
- for (String az : availabilityZones) {
- azToNumReplicas.put(az, 0);
+ @Override
+ protected boolean addProjectedReplicaWeights(Replica replica) {
+ nodeFreeDiskGB -= getProjectedSizeOfReplica(replica);
+ coresOnNode += 1;
+ return addReplicaToAzAndSpread(replica);
}
- // Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica
- // type
- Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType));
- // Remove nodes that already have a replica for the shard (no two replicas of same shard can
- // be put on same node)
- candidateNodes.removeAll(nodesWithReplicas);
+ @Override
+ protected void initReplicaWeights(Replica replica) {
+ addReplicaToAzAndSpread(replica);
+ }
- // This Map will include the affinity labels for the nodes that are currently hosting replicas
- // of this shard. It will be modified with new placement decisions.
- Map<String, Integer> spreadDomainsInUse = new HashMap<>();
- Shard shard = solrCollection.getShard(shardName);
- if (shard != null) {
- // shard is non null if we're adding replicas to an already existing collection.
- // If we're creating the collection, the shards do not exist yet.
- for (Replica replica : shard.replicas()) {
- // The node's AZ is counted as having a replica if it has a replica of the same type as
- // the one we need to place here.
- if (replica.getType() == replicaType) {
- final String az = getNodeAZ(replica.getNode(), attrValues);
- if (azToNumReplicas.containsKey(az)) {
- // We do not count replicas on AZ's for which we don't have any node to place on
- // because it would not help the placement decision. If we did want to do that, note
- // the dereferencing below can't be assumed as the entry will not exist in the map.
- azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
- }
- if (doSpreadAcrossDomains) {
- attrValues
- .getSystemProperty(
- replica.getNode(), AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
- .ifPresent(nodeDomain -> spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum));
- }
- }
+ private boolean addReplicaToAzAndSpread(Replica replica) {
+ boolean needsResort = false;
+ // Only use AvailabilityZones if there are more than 1
+ if (affinityPlacementContext.allAvailabilityZones.size() > 1) {
+ needsResort |=
+ affinityPlacementContext
+ .availabilityZoneUsage
+ .computeIfAbsent(
+ replica.getShard().getCollection().getName(), k -> new HashMap<>())
+ .computeIfAbsent(replica.getShard().getShardName(), k -> new HashMap<>())
+ .computeIfAbsent(
+ replica.getType(),
+ k -> new ReplicaSpread(affinityPlacementContext.allAvailabilityZones))
+ .addReplica(availabilityZone);
}
+ // Only use SpreadDomains if they have been provided to all nodes and there are more than 1
+ if (affinityPlacementContext.doSpreadAcrossDomains) {
+ needsResort |=
+ affinityPlacementContext
+ .spreadDomainUsage
+ .computeIfAbsent(
+ replica.getShard().getCollection().getName(), k -> new HashMap<>())
+ .computeIfAbsent(
+ replica.getShard().getShardName(),
+ k -> new ReplicaSpread(affinityPlacementContext.allSpreadDomains))
+ .addReplica(spreadDomain);
+ }
+ return needsResort;
}
- // We now have the set of real candidate nodes, we've enforced "No more than one replica of a
- // given shard on a given node". We also counted for the shard and replica type under
- // consideration how many replicas were per AZ, so we can place (or try to place) replicas on
- // AZ's that have fewer replicas
-
- Map<String, List<Node>> nodesPerAz = new HashMap<>();
- if (availabilityZones.size() == 1) {
- // If AZs are not being used (all undefined for example) or a single AZ exists, we add all
- // nodes
- // to the same entry
- nodesPerAz.put(availabilityZones.iterator().next(), new ArrayList<>(candidateNodes));
- } else {
- // Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to
- // placement candidates.
- for (Node node : candidateNodes) {
- String nodeAz = getNodeAZ(node, attrValues);
- List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>());
- nodesForAz.add(node);
+ @Override
+ protected void removeProjectedReplicaWeights(Replica replica) {
+ nodeFreeDiskGB += getProjectedSizeOfReplica(replica);
+ coresOnNode -= 1;
+ // Only use AvailabilityZones if there are more than 1
+ if (affinityPlacementContext.allAvailabilityZones.size() > 1) {
+ Optional.ofNullable(
+ affinityPlacementContext.availabilityZoneUsage.get(
+ replica.getShard().getCollection().getName()))
+ .map(m -> m.get(replica.getShard().getShardName()))
+ .map(m -> m.get(replica.getType()))
+ .ifPresent(m -> m.removeReplica(availabilityZone));
+ }
+ // Only use SpreadDomains if they have been provided to all nodes and there are more than 1
+ if (affinityPlacementContext.doSpreadAcrossDomains) {
+ Optional.ofNullable(
+ affinityPlacementContext.spreadDomainUsage.get(
+ replica.getShard().getCollection().getName()))
+ .map(m -> m.get(replica.getShard().getShardName()))
+ .ifPresent(m -> m.removeReplica(spreadDomain));
}
}
- Comparator<Node> interGroupNodeComparator =
- new CoresAndDiskComparator(attrValues, coresOnNodes, prioritizedFreeDiskGB);
-
- // Build a treeMap sorted by the number of replicas per AZ and including candidates nodes
- // suitable for placement on the AZ, so we can easily select the next AZ to get a replica
- // assignment and quickly (constant time) decide if placement on this AZ is possible or not.
- Map<Integer, Set<AzWithNodes>> azByExistingReplicas =
- new TreeMap<>(Comparator.naturalOrder());
- for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
- azByExistingReplicas
- .computeIfAbsent(azToNumReplicas.get(e.getKey()), k -> new HashSet<>())
- .add(
- new AzWithNodes(
- e.getKey(),
- e.getValue(),
- doSpreadAcrossDomains,
- interGroupNodeComparator,
- replicaPlacementRandom,
- attrValues,
- spreadDomainsInUse));
+ private double getProjectedSizeOfReplica(Replica replica) {
+ return attrValues
+ .getCollectionMetrics(replica.getShard().getCollection().getName())
+ .flatMap(colMetrics -> colMetrics.getShardMetrics(replica.getShard().getShardName()))
+ .flatMap(ShardMetrics::getLeaderMetrics)
+ .flatMap(lrm -> lrm.getReplicaMetric(ReplicaMetricImpl.INDEX_SIZE_GB))
+ .orElse(0D);
}
- for (int i = 0; i < numReplicas; i++) {
- // We have for each AZ on which we might have a chance of placing a replica, the list of
- // candidate nodes for replicas (candidate: does not already have a replica of this shard
- // and is in the corresponding AZ). Among the AZ's with the minimal number of replicas of
- // the given replica type for the shard, we must pick the AZ that offers the best placement
- // (based on number of cores and free disk space). In order to do so, for these "minimal"
- // AZ's we sort the nodes from best to worst placement candidate (based on the number of
- // cores and free disk space) then pick the AZ that has the best best node. We don't sort
- // all AZ's because that will not necessarily be needed.
- int minNumberOfReplicasPerAz = 0; // This value never observed but compiler can't tell
- Set<Map.Entry<Integer, AzWithNodes>> candidateAzEntries = null;
- // Iterate over AZ's (in the order of increasing number of replicas on that AZ) and do two
- // things: 1. remove those AZ's that have no nodes, no use iterating over these again and
- // again (as we compute placement for more replicas), and 2. collect all those AZ with a
- // minimal number of replicas.
- for (Map.Entry<Integer, Set<AzWithNodes>> mapEntry : azByExistingReplicas.entrySet()) {
- Iterator<AzWithNodes> it = mapEntry.getValue().iterator();
- while (it.hasNext()) {
- Map.Entry<Integer, AzWithNodes> entry = Map.entry(mapEntry.getKey(), it.next());
- int numberOfNodes = entry.getValue().numNodes();
- if (numberOfNodes == 0) {
- it.remove();
- } else { // AZ does have node(s) for placement
- if (candidateAzEntries == null) {
- // First AZ with nodes that can take the replica. Initialize tracking structures
- minNumberOfReplicasPerAz = numberOfNodes;
- candidateAzEntries = new HashSet<>();
- }
- if (minNumberOfReplicasPerAz != numberOfNodes) {
- // AZ's with more replicas than the minimum number seen are not placement candidates
- break;
- }
- candidateAzEntries.add(entry);
- // We remove all entries that are candidates: the "winner" will be modified, all
- // entries
- // might also be sorted, so we'll insert back the updated versions later.
- it.remove();
- }
- }
+ /**
+ * If there are more than one spreadDomains given in the cluster, then return a weight for
+ * this node, given the number of replicas in its spreadDomain.
+ *
+ * <p>For each Collection & Shard, sum up the number of replicas this node's SpreadDomain has
+ * over the minimum SpreadDomain. Square each value before summing, to ensure that smaller
+ * number of higher values are penalized more than a larger number of smaller values.
+ *
+ * @return the weight
+ */
+ private int getSpreadDomainWeight() {
+ if (affinityPlacementContext.doSpreadAcrossDomains) {
+ return affinityPlacementContext.spreadDomainUsage.values().stream()
+ .flatMap(m -> m.values().stream())
+ .mapToInt(rs -> rs.overMinimum(spreadDomain))
+ .map(i -> i * i)
+ .sum();
+ } else {
+ return 0;
}
+ }
- if (candidateAzEntries == null) {
- // This can happen because not enough nodes for the placement request or already too many
- // nodes with replicas of the shard that can't accept new replicas or not enough nodes
- // with enough free disk space.
- throw new PlacementException(
- "Not enough eligible nodes to place "
- + numReplicas
- + " replica(s) of type "
- + replicaType
- + " for shard "
- + shardName
- + " of collection "
- + solrCollection.getName());
+ /**
+ * If there are more than one SpreadDomains given in the cluster, then return a projected
+ * SpreadDomain weight for this node and this replica.
+ *
+ * <p>For the new replica's Collection & Shard, project the number of replicas this node's
+ * SpreadDomain has over the minimum SpreadDomain.
+ *
+ * @return the weight
+ */
+ private int projectReplicaSpreadWeight(Replica replica) {
+ if (replica != null && affinityPlacementContext.doSpreadAcrossDomains) {
+ return Optional.ofNullable(
+ affinityPlacementContext.spreadDomainUsage.get(
+ replica.getShard().getCollection().getName()))
+ .map(m -> m.get(replica.getShard().getShardName()))
+ .map(rs -> rs.projectOverMinimum(spreadDomain, 1))
+ .orElse(0);
+ } else {
+ return 0;
}
+ }
- // Iterate over all candidate AZ's, sort them if needed and find the best one to use for
- // this placement
- Map.Entry<Integer, AzWithNodes> selectedAz = null;
- Node selectedAzBestNode = null;
- for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : candidateAzEntries) {
- AzWithNodes azWithNodes = candidateAzEntry.getValue();
- azWithNodes.ensureSorted();
-
- // Which one is better, the new one or the previous best?
- if (selectedAz == null
- || interGroupNodeComparator.compare(azWithNodes.getBestNode(), selectedAzBestNode)
- < 0) {
- selectedAz = candidateAzEntry;
- selectedAzBestNode = azWithNodes.getBestNode();
- }
+ /**
+ * If there are more than one AvailabilityZones given in the cluster, then return a weight for
+ * this node, given the number of replicas in its availabilityZone.
+ *
+ * <p>For each Collection, Shard & ReplicaType, sum up the number of replicas this node's
+ * AvailabilityZone has over the minimum AvailabilityZone. Square each value before summing,
+ * to ensure that smaller number of higher values are penalized more than a larger number of
+ * smaller values.
+ *
+ * @return the weight
+ */
+ private int getAZWeight() {
+ if (affinityPlacementContext.allAvailabilityZones.size() < 2) {
+ return 0;
+ } else {
+ return affinityPlacementContext.availabilityZoneUsage.values().stream()
+ .flatMap(m -> m.values().stream())
+ .flatMap(m -> m.values().stream())
+ .mapToInt(rs -> rs.overMinimum(availabilityZone))
+ .map(i -> i * i)
+ .sum();
}
+ }
- // Now actually remove the selected node from the winning AZ
- AzWithNodes azWithNodes = selectedAz.getValue();
- Node assignTarget = azWithNodes.removeBestNode();
-
- // Insert back all the qualifying but non winning AZ's removed while searching for the one
- for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
- if (removedAzs != selectedAz) {
- azByExistingReplicas
- .computeIfAbsent(removedAzs.getKey(), k -> new HashSet<>())
- .add(removedAzs.getValue());
- }
+ /**
+ * If there are more than one AvailabilityZones given in the cluster, then return a projected
+ * AvailabilityZone weight for this node and this replica.
+ *
+ * <p>For the new replica's Collection, Shard & ReplicaType, project the number of replicas
+ * this node's AvailabilityZone has over the minimum AvailabilityZone.
+ *
+ * @return the weight
+ */
+ private int projectAZWeight(Replica replica) {
+ if (replica == null || affinityPlacementContext.allAvailabilityZones.size() < 2) {
+ return 0;
+ } else {
+ return Optional.ofNullable(
+ affinityPlacementContext.availabilityZoneUsage.get(
+ replica.getShard().getCollection().getName()))
+ .map(m -> m.get(replica.getShard().getShardName()))
+ .map(m -> m.get(replica.getType()))
+ .map(rs -> rs.projectOverMinimum(availabilityZone, 1))
+ .orElse(0);
}
-
- // Insert back a corrected entry for the winning AZ: one more replica living there and one
- // less node that can accept new replicas (the remaining candidate node list might be empty,
- // in which case it will be cleaned up on the next iteration).
- azByExistingReplicas
- .computeIfAbsent(selectedAz.getKey() + 1, k -> new HashSet<>())
- .add(azWithNodes);
-
- // Do not assign that node again for replicas of other replica type for this shard (this
- // update of the set is not useful in the current execution of this method but for following
- // ones only)
- nodesWithReplicas.add(assignTarget);
-
- // Track that the node has one more core. These values are only used during the current run
- // of the plugin.
- coresOnNodes.merge(assignTarget, 1, Integer::sum);
-
- // Register the replica assignment just decided
- replicaPlacements.add(
- placementPlanFactory.createReplicaPlacement(
- solrCollection, shardName, assignTarget, replicaType));
}
}
- private Set<Node> filterNodesWithCollection(
- Cluster cluster,
- PlacementRequest request,
- AttributeValues attributeValues,
- Set<Node> initialNodes)
- throws PlacementException {
- // if there's a `withCollection` constraint for this collection then remove nodes
- // that are not eligible
- String withCollectionName = withCollections.get(request.getCollection().getName());
- if (withCollectionName == null) {
- return initialNodes;
- }
- SolrCollection withCollection;
- try {
- withCollection = cluster.getCollection(withCollectionName);
- } catch (Exception e) {
- throw new PlacementException(
- "Error getting info of withCollection=" + withCollectionName, e);
- }
- Set<Node> withCollectionNodes = new HashSet<>();
- withCollection
- .shards()
- .forEach(s -> s.replicas().forEach(r -> withCollectionNodes.add(r.getNode())));
- if (withCollectionNodes.isEmpty()) {
- throw new PlacementException(
- "Collection "
- + withCollection
- + " defined in `withCollection` has no replicas on eligible nodes.");
- }
- HashSet<Node> filteredNodes = new HashSet<>(initialNodes);
- filteredNodes.retainAll(withCollectionNodes);
- if (filteredNodes.isEmpty()) {
- throw new PlacementException(
- "Collection "
- + withCollection
- + " defined in `withCollection` has no replicas on eligible nodes.");
- }
- return filteredNodes;
- }
+ private static class ReplicaSpread {
+ private final Set<String> allKeys;
+ private final Map<String, Integer> spread;
+ private int minReplicasLocated;
- private Set<Node> filterNodesByNodeType(
- Cluster cluster,
- PlacementRequest request,
- AttributeValues attributeValues,
- Set<Node> initialNodes)
- throws PlacementException {
- Set<String> collNodeTypes = nodeTypes.get(request.getCollection().getName());
- if (collNodeTypes == null) {
- // no filtering by node type
- return initialNodes;
+ private ReplicaSpread(Set<String> allKeys) {
+ this.allKeys = allKeys;
+ this.spread = new HashMap<>();
+ this.minReplicasLocated = 0;
}
- Set<Node> filteredNodes =
- initialNodes.stream()
- .filter(
- n -> {
- Optional<String> nodePropOpt =
- attributeValues.getSystemProperty(
- n, AffinityPlacementConfig.NODE_TYPE_SYSPROP);
- if (!nodePropOpt.isPresent()) {
- return false;
- }
- Set<String> nodeTypes =
- new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
- nodeTypes.retainAll(collNodeTypes);
- return !nodeTypes.isEmpty();
- })
- .collect(Collectors.toSet());
- if (filteredNodes.isEmpty()) {
- throw new PlacementException(
- "There are no nodes with types: "
- + collNodeTypes
- + " expected by collection "
- + request.getCollection().getName());
+
+ int overMinimum(String key) {
+ return spread.getOrDefault(key, 0) - minReplicasLocated;
}
- return filteredNodes;
- }
- /**
- * Comparator implementing the placement strategy based on free space and number of cores: we
- * want to place new replicas on nodes with the less number of cores, but only if they do have
- * enough disk space (expressed as a threshold value).
- */
- static class CoresAndDiskComparator implements Comparator<Node> {
- private final AttributeValues attrValues;
- private final Map<Node, Integer> coresOnNodes;
- private final long prioritizedFreeDiskGB;
/**
- * The data we sort on is not part of the {@link Node} instances but has to be retrieved from
- * the attributes and configuration. The number of cores per node is passed in a map whereas
- * the free disk is fetched from the attributes due to the fact that we update the number of
- * cores per node as we do allocations, but we do not update the free disk. The attrValues
- * corresponding to the number of cores per node are the initial values, but we want to
- * compare the actual value taking into account placement decisions already made during the
- * current execution of the placement plugin.
+ * Trying adding a replica for the given spread key, and return the {@link
+ * #overMinimum(String)} with it added. Remove the replica, so that the state is unchanged
+ * from when the method was called.
*/
- CoresAndDiskComparator(
- AttributeValues attrValues, Map<Node, Integer> coresOnNodes, long prioritizedFreeDiskGB) {
- this.attrValues = attrValues;
- this.coresOnNodes = coresOnNodes;
- this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
- }
-
- @Override
- public int compare(Node a, Node b) {
- // Note all nodes do have free disk defined. This has been verified earlier.
- boolean aHasLowFreeSpace =
- attrValues.getNodeMetric(a, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
- boolean bHasLowFreeSpace =
- attrValues.getNodeMetric(b, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
- if (aHasLowFreeSpace != bHasLowFreeSpace) {
- // A node with low free space should be considered > node with high free space since it
- // needs to come later in sort order
- return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace);
+ int projectOverMinimum(String key, int replicaDelta) {
+ int overMinimum = overMinimum(key);
+ if (overMinimum == 0 && replicaDelta > 0) {
+ addReplica(key);
+ int projected = overMinimum(key);
+ removeReplica(key);
+ return projected;
+ } else {
+ return Integer.max(0, overMinimum + replicaDelta);
}
- // The ordering on the number of cores is the natural order.
- return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
}
- }
-
- static class SpreadDomainComparator implements Comparator<SpreadDomainWithNodes> {
- private final Map<String, Integer> spreadDomainUsage;
- SpreadDomainComparator(Map<String, Integer> spreadDomainUsage) {
- this.spreadDomainUsage = spreadDomainUsage;
+ /**
+ * Add a replica for the given spread key, returning whether a full resorting is needed for
+ * AffinityNodes. Resorting is only needed if other nodes could possibly have a lower weight
+ * than before.
+ *
+ * @param key the spread key for the replica that should be added
+ * @return whether a re-sort is required
+ */
+ boolean addReplica(String key) {
+ int previous = spread.getOrDefault(key, 0);
+ spread.put(key, previous + 1);
+ if (allKeys.size() > 0
+ && spread.size() == allKeys.size()
+ && previous == minReplicasLocated) {
+ minReplicasLocated = spread.values().stream().mapToInt(Integer::intValue).min().orElse(0);
+ return true;
+ }
+ return false;
}
- @Override
- public int compare(SpreadDomainWithNodes group1, SpreadDomainWithNodes group2) {
- // This comparator will compare groups by:
- // 1. The number of usages for the domain they represent: We want groups that are
- // less used to be the best ones
- // 2. On equal number of usages, by the internal comparator (which uses core count and disk
- // space) on the best node for each group (which, since the list is sorted, it's always the
- // one in the position 0)
- Integer usagesLabel1 = spreadDomainUsage.getOrDefault(group1.spreadDomainName, 0);
- Integer usagesLabel2 = spreadDomainUsage.getOrDefault(group2.spreadDomainName, 0);
- if (usagesLabel1.equals(usagesLabel2)) {
- return group1.compareTo(group2);
+ void removeReplica(String key) {
+ Integer replicasLocated = spread.computeIfPresent(key, (k, v) -> v - 1 == 0 ? null : v - 1);
+ if (replicasLocated == null) {
+ replicasLocated = 0;
+ }
+ if (replicasLocated < minReplicasLocated) {
+ minReplicasLocated = replicasLocated;
}
- return usagesLabel1.compareTo(usagesLabel2);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
index 298c0d3687e..e00129094c5 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
@@ -17,16 +17,9 @@
package org.apache.solr.cluster.placement.plugins;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.SolrCollection;
@@ -34,14 +27,9 @@ import org.apache.solr.cluster.placement.AttributeFetcher;
import org.apache.solr.cluster.placement.AttributeValues;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
-import org.apache.solr.common.util.SuppressForbidden;
/**
* Factory for creating {@link MinimizeCoresPlacementPlugin}, a Placement plugin implementing
@@ -49,6 +37,8 @@ import org.apache.solr.common.util.SuppressForbidden;
* the same shard on the same node. This code is meant as an educational example of a placement
* plugin.
*
+ * <p>See {@link NodeWithCoreCount} for information on how this PlacementFactory weights nodes.
+ *
* <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
*/
public class MinimizeCoresPlacementFactory
@@ -59,122 +49,69 @@ public class MinimizeCoresPlacementFactory
return new MinimizeCoresPlacementPlugin();
}
- private static class MinimizeCoresPlacementPlugin implements PlacementPlugin {
+ private static class MinimizeCoresPlacementPlugin extends OrderedNodePlacementPlugin {
@Override
- @SuppressForbidden(
- reason =
- "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- public List<PlacementPlan> computePlacements(
- Collection<PlacementRequest> requests, PlacementContext placementContext)
+ protected Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors)
throws PlacementException {
- List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
- Set<Node> allNodes = new HashSet<>();
- for (PlacementRequest request : requests) {
- allNodes.addAll(request.getTargetNodes());
- }
-
// Fetch attributes for a superset of all nodes requested amongst the placementRequests
AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
attributeFetcher.requestNodeMetric(NodeMetricImpl.NUM_CORES);
- attributeFetcher.fetchFrom(allNodes);
+ attributeFetcher.fetchFrom(nodes);
AttributeValues attrValues = attributeFetcher.fetchAttributes();
- Map<String, Integer> coresPerNodeTotal = new HashMap<>();
- for (Node node : allNodes) {
- if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
+ HashMap<Node, WeightedNode> nodeMap = new HashMap<>();
+ for (Node node : nodes) {
+ if (skipNodesWithErrors
+ && attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
throw new PlacementException("Can't get number of cores in " + node);
}
- coresPerNodeTotal.put(
- node.getName(), attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get());
+ nodeMap.put(
+ node,
+ new NodeWithCoreCount(
+ node, attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).orElse(0)));
}
- for (PlacementRequest request : requests) {
- int totalReplicasPerShard = 0;
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- totalReplicasPerShard += request.getCountReplicasToCreate(rt);
- }
-
- if (request.getTargetNodes().size() < totalReplicasPerShard) {
- throw new PlacementException("Cluster size too small for number of replicas per shard");
- }
-
- // Get number of cores on each Node
- Map<Integer, Set<Node>> nodesByCores = new TreeMap<>(Comparator.naturalOrder());
-
- Set<Node> nodes = request.getTargetNodes();
-
- // Get the number of cores on each node and sort the nodes by increasing number of cores
- for (Node node : nodes) {
- nodesByCores
- .computeIfAbsent(coresPerNodeTotal.get(node.getName()), k -> new HashSet<>())
- .add(node);
- }
-
- Set<ReplicaPlacement> replicaPlacements =
- new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
-
- // Now place all replicas of all shards on nodes, by placing on nodes with the smallest
- // number of cores and taking into account replicas placed during this computation. Note
- // that for each shard we must place replicas on different nodes, when moving to the next
- // shard we use the nodes sorted by their updated number of cores (due to replica placements
- // for previous shards).
- for (String shardName : request.getShardNames()) {
- // Assign replicas based on the sort order of the nodesByCores tree multimap to put
- // replicas on nodes with fewer cores first. We only need totalReplicasPerShard nodes
- // given that's the number of replicas to place. We assign based on the passed
- // nodeEntriesToAssign list so the right nodes get replicas.
- List<Map.Entry<Integer, Node>> nodeEntriesToAssign =
- nodesByCores.entrySet().stream()
- .flatMap(e -> e.getValue().stream().map(n -> Map.entry(e.getKey(), n)))
- .limit(totalReplicasPerShard)
- .collect(Collectors.toList());
+ return nodeMap;
+ }
+ }
- // Update the number of cores each node will have once the assignments below got
- // executed so the next shard picks the lowest loaded nodes for its replicas.
- for (Map.Entry<Integer, Node> e : nodeEntriesToAssign) {
- int coreCount = e.getKey();
- Node node = e.getValue();
- nodesByCores.getOrDefault(coreCount, new HashSet<>()).remove(node);
- nodesByCores.computeIfAbsent(coreCount + 1, k -> new HashSet<>()).add(node);
- coresPerNodeTotal.put(node.getName(), coreCount + 1);
- }
+ /**
+ * This implementation weights nodes according to how many cores they contain. The weight of a
+ * node is just the count of cores on that node.
+ *
+ * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+ */
+ private static class NodeWithCoreCount extends OrderedNodePlacementPlugin.WeightedNode {
+ private int coreCount;
+
+ public NodeWithCoreCount(Node node, int coreCount) {
+ super(node);
+ this.coreCount = coreCount;
+ }
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- placeReplicas(
- request.getCollection(),
- nodeEntriesToAssign,
- placementContext.getPlacementPlanFactory(),
- replicaPlacements,
- shardName,
- request,
- replicaType);
- }
- }
+ @Override
+ public int calcWeight() {
+ return coreCount;
+ }
- placementPlans.add(
- placementContext
- .getPlacementPlanFactory()
- .createPlacementPlan(request, replicaPlacements));
- }
- return placementPlans;
+ @Override
+ public int calcRelevantWeightWithReplica(Replica replica) {
+ return coreCount + 1;
}
- private void placeReplicas(
- SolrCollection solrCollection,
- List<Map.Entry<Integer, Node>> nodeEntriesToAssign,
- PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements,
- String shardName,
- PlacementRequest request,
- Replica.ReplicaType replicaType) {
- for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
- final Map.Entry<Integer, Node> entry = nodeEntriesToAssign.remove(0);
- final Node node = entry.getValue();
+ @Override
+ public boolean addProjectedReplicaWeights(Replica replica) {
+ coreCount += 1;
+ return false;
+ }
- replicaPlacements.add(
- placementPlanFactory.createReplicaPlacement(
- solrCollection, shardName, node, replicaType));
- }
+ @Override
+ public void removeProjectedReplicaWeights(Replica replica) {
+ coreCount -= 1;
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
new file mode 100644
index 00000000000..a9d1f4ea048
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalanceRequest;
+import org.apache.solr.cluster.placement.DeleteCollectionRequest;
+import org.apache.solr.cluster.placement.DeleteReplicasRequest;
+import org.apache.solr.cluster.placement.DeleteShardsRequest;
+import org.apache.solr.cluster.placement.ModificationRequest;
+import org.apache.solr.cluster.placement.PlacementContext;
+import org.apache.solr.cluster.placement.PlacementException;
+import org.apache.solr.cluster.placement.PlacementModificationException;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.PlacementRequest;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.common.util.CollectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ public List<PlacementPlan> computePlacements(
+ Collection<PlacementRequest> requests, PlacementContext placementContext)
+ throws PlacementException {
+ List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
+ Set<Node> allNodes = new HashSet<>();
+ Set<SolrCollection> allCollections = new HashSet<>();
+ for (PlacementRequest request : requests) {
+ allNodes.addAll(request.getTargetNodes());
+ allCollections.add(request.getCollection());
+ }
+ Collection<WeightedNode> weightedNodes =
+ getWeightedNodes(placementContext, allNodes, allCollections, true).values();
+ for (PlacementRequest request : requests) {
+ int totalReplicasPerShard = 0;
+ for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+ totalReplicasPerShard += request.getCountReplicasToCreate(rt);
+ }
+
+ List<WeightedNode> nodesForRequest =
+ weightedNodes.stream()
+ .filter(wn -> request.getTargetNodes().contains(wn.getNode()))
+ .collect(Collectors.toList());
+
+ Set<ReplicaPlacement> replicaPlacements =
+ CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
+
+ SolrCollection solrCollection = request.getCollection();
+ // Now place randomly all replicas of all shards on available nodes
+ for (String shardName : request.getShardNames()) {
+ for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+ int replicaCount = request.getCountReplicasToCreate(replicaType);
+ if (replicaCount == 0) {
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Placing {} replicas for Collection: {}, Shard: {}, ReplicaType: {}",
+ replicaCount,
+ solrCollection.getName(),
+ shardName,
+ replicaType);
+ }
+ Replica pr = createProjectedReplica(solrCollection, shardName, replicaType, null);
+ PriorityQueue<WeightedNode> nodesForReplicaType = new PriorityQueue<>();
+ nodesForRequest.stream()
+ .filter(n -> n.canAddReplica(pr))
+ .forEach(
+ n -> {
+ n.sortByRelevantWeightWithReplica(pr);
+ n.addToSortedCollection(nodesForReplicaType);
+ });
+
+ int replicasPlaced = 0;
+ while (!nodesForReplicaType.isEmpty() && replicasPlaced < replicaCount) {
+ WeightedNode node = nodesForReplicaType.poll();
+ if (!node.canAddReplica(pr)) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Node can no longer accept replica, removing from selection list: {}",
+ node.getNode());
+ }
+ continue;
+ }
+ if (node.hasWeightChangedSinceSort()) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Node's sort is out-of-date, adding back to selection list: {}",
+ node.getNode());
+ }
+ node.addToSortedCollection(nodesForReplicaType);
+ // The node will be re-sorted,
+ // so go back to the top of the loop to get the new lowest-sorted node
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Node chosen to host replica: {}", node.getNode());
+ }
+
+ boolean needsToResortAll =
+ node.addReplica(
+ createProjectedReplica(solrCollection, shardName, replicaType, node.getNode()));
+ replicasPlaced += 1;
+ replicaPlacements.add(
+ placementContext
+ .getPlacementPlanFactory()
+ .createReplicaPlacement(
+ solrCollection, shardName, node.getNode(), replicaType));
+ // Only update the priorityQueue if there are still replicas to be placed
+ if (replicasPlaced < replicaCount) {
+ if (needsToResortAll) {
+ if (log.isDebugEnabled()) {
+ log.debug("Replica addition requires re-sorting of entire selection list");
+ }
+ List<WeightedNode> nodeList = new ArrayList<>(nodesForReplicaType);
+ nodesForReplicaType.clear();
+ nodeList.forEach(n -> n.addToSortedCollection(nodesForReplicaType));
+ }
+ // Add the chosen node back to the list if it can accept another replica of the
+ // shard/replicaType.
+ // The default implementation of "canAddReplica()" returns false for replicas
+ // of shards that the node already contains, so this will usually be false.
+ if (node.canAddReplica(pr)) {
+ nodesForReplicaType.add(node);
+ }
+ }
+ }
+
+ if (replicasPlaced < replicaCount) {
+ throw new PlacementException(
+ String.format(
+ Locale.ROOT,
+ "Not enough eligible nodes to place %d replica(s) of type %s for shard %s of collection %s. Only able to place %d replicas.",
+ replicaCount,
+ replicaType,
+ shardName,
+ solrCollection.getName(),
+ replicasPlaced));
+ }
+ }
+ }
+
+ placementPlans.add(
+ placementContext
+ .getPlacementPlanFactory()
+ .createPlacementPlan(request, replicaPlacements));
+ }
+ return placementPlans;
+ }
+
+ @Override
+ public BalancePlan computeBalancing(
+ BalanceRequest balanceRequest, PlacementContext placementContext) throws PlacementException {
+ Map<Replica, Node> replicaMovements = new HashMap<>();
+ TreeSet<WeightedNode> orderedNodes = new TreeSet<>();
+ Collection<WeightedNode> weightedNodes =
+ getWeightedNodes(
+ placementContext,
+ balanceRequest.getNodes(),
+ placementContext.getCluster().collections(),
+ true)
+ .values();
+ // This is critical to store the last sort weight for this node
+ weightedNodes.forEach(
+ node -> {
+ node.sortWithoutChanges();
+ node.addToSortedCollection(orderedNodes);
+ });
+
+ // While the node with the lowest weight still has room to take a replica from the node with the
+ // highest weight, loop
+ Map<Replica, Node> newReplicaMovements = new HashMap<>();
+ ArrayList<WeightedNode> traversedHighNodes = new ArrayList<>(orderedNodes.size() - 1);
+ while (orderedNodes.size() > 1
+ && orderedNodes.first().calcWeight() < orderedNodes.last().calcWeight()) {
+ WeightedNode lowestWeight = orderedNodes.pollFirst();
+ if (lowestWeight == null) {
+ break;
+ }
+ if (lowestWeight.hasWeightChangedSinceSort()) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Re-sorting lowest weighted node: {}, sorting weight is out-of-date.",
+ lowestWeight.getNode().getName());
+ }
+ // Re-sort this node and go back to find the lowest weight
+ lowestWeight.addToSortedCollection(orderedNodes);
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Lowest weighted node: {}, weight: {}",
+ lowestWeight.getNode().getName(),
+ lowestWeight.calcWeight());
+ }
+
+ newReplicaMovements.clear();
+ // If a compatible node was found to move replicas, break and find the lowest weighted node
+ // again
+ while (newReplicaMovements.isEmpty()
+ && !orderedNodes.isEmpty()
+ && orderedNodes.last().calcWeight() > lowestWeight.calcWeight() + 1) {
+ WeightedNode highestWeight = orderedNodes.pollLast();
+ if (highestWeight == null) {
+ break;
+ }
+ if (highestWeight.hasWeightChangedSinceSort()) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Re-sorting highest weighted node: {}, sorting weight is out-of-date.",
+ highestWeight.getNode().getName());
+ }
+ // Re-sort this node and go back to find the highest weight
+ highestWeight.addToSortedCollection(orderedNodes);
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Highest weighted node: {}, weight: {}",
+ highestWeight.getNode().getName(),
+ highestWeight.calcWeight());
+ }
+
+ traversedHighNodes.add(highestWeight);
+ // select a replica from the node with the most cores to move to the node with the least
+ // cores
+ List<Replica> availableReplicasToMove =
+ highestWeight.getAllReplicasOnNode().stream()
+ .sorted(Comparator.comparing(Replica::getReplicaName))
+ .collect(Collectors.toList());
+ int combinedNodeWeights = highestWeight.calcWeight() + lowestWeight.calcWeight();
+ for (Replica r : availableReplicasToMove) {
+ // Only continue if the replica can be removed from the old node and moved to the new node
+ if (!highestWeight.canRemoveReplicas(Set.of(r)).isEmpty()
+ || !lowestWeight.canAddReplica(r)) {
+ continue;
+ }
+ lowestWeight.addReplica(r);
+ highestWeight.removeReplica(r);
+ int lowestWeightWithReplica = lowestWeight.calcWeight();
+ int highestWeightWithoutReplica = highestWeight.calcWeight();
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica: {}, toNode weight with replica: {}, fromNode weight without replica: {}",
+ r.getReplicaName(),
+ lowestWeightWithReplica,
+ highestWeightWithoutReplica);
+ }
+
+ // If the combined weight of both nodes is lower after the move, make the move.
+ // Otherwise, make the move if it doesn't cause the weight of the higher node to
+ // go below the weight of the lower node, because that is over-correction.
+ if (highestWeightWithoutReplica + lowestWeightWithReplica >= combinedNodeWeights
+ && highestWeightWithoutReplica < lowestWeightWithReplica) {
+ // Undo the move
+ lowestWeight.removeReplica(r);
+ highestWeight.addReplica(r);
+ continue;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica Movement chosen. From: {}, To: {}, Replica: {}",
+ highestWeight.getNode().getName(),
+ lowestWeight.getNode().getName(),
+ r);
+ }
+ newReplicaMovements.put(r, lowestWeight.getNode());
+
+ // Do not go beyond here, do another loop and see if other nodes can move replicas.
+ // It might end up being the same nodes in the next loop that end up moving another
+ // replica, but that's ok.
+ break;
+ }
+ }
+ // For now we do not have any way to see if there are out-of-date notes in the middle of the
+ // TreeSet. Therefore, we need to re-sort this list after every selection. In the future, we
+ // should find a way to re-sort the out-of-date nodes without having to sort all nodes.
+ traversedHighNodes.addAll(orderedNodes);
+ orderedNodes.clear();
+
+ // Add back in the traversed highNodes that we did not select replicas from,
+ // they might have replicas to move to the next lowestWeighted node
+ traversedHighNodes.forEach(n -> n.addToSortedCollection(orderedNodes));
+ traversedHighNodes.clear();
+ if (newReplicaMovements.size() > 0) {
+ replicaMovements.putAll(newReplicaMovements);
+ // There are no replicas to move to the lowestWeight, remove it from our loop
+ lowestWeight.addToSortedCollection(orderedNodes);
+ }
+ }
+
+ return placementContext
+ .getBalancePlanFactory()
+ .createBalancePlan(balanceRequest, replicaMovements);
+ }
+
+ protected Map<Node, WeightedNode> getWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors)
+ throws PlacementException {
+ Map<Node, WeightedNode> weightedNodes =
+ getBaseWeightedNodes(placementContext, nodes, relevantCollections, skipNodesWithErrors);
+
+ for (SolrCollection collection : placementContext.getCluster().collections()) {
+ for (Shard shard : collection.shards()) {
+ for (Replica replica : shard.replicas()) {
+ WeightedNode weightedNode = weightedNodes.get(replica.getNode());
+ if (weightedNode != null) {
+ weightedNode.initReplica(replica);
+ }
+ }
+ }
+ }
+
+ return weightedNodes;
+ }
+
+ protected abstract Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors)
+ throws PlacementException;
+
+ @Override
+ public void verifyAllowedModification(
+ ModificationRequest modificationRequest, PlacementContext placementContext)
+ throws PlacementException {
+ if (modificationRequest instanceof DeleteShardsRequest) {
+ log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
+ } else if (modificationRequest instanceof DeleteCollectionRequest) {
+ verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
+ } else if (modificationRequest instanceof DeleteReplicasRequest) {
+ verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
+ } else {
+ log.warn("unsupported request type, skipping: {}", modificationRequest);
+ }
+ }
+
+ protected void verifyDeleteCollection(
+ DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext)
+ throws PlacementException {
+ // NO-OP
+ }
+
+ protected void verifyDeleteReplicas(
+ DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext)
+ throws PlacementException {
+ Map<Node, List<Replica>> nodesRepresented =
+ deleteReplicasRequest.getReplicas().stream()
+ .collect(Collectors.groupingBy(Replica::getNode));
+
+ Map<Node, WeightedNode> weightedNodes =
+ getWeightedNodes(
+ placementContext,
+ nodesRepresented.keySet(),
+ placementContext.getCluster().collections(),
+ false);
+
+ PlacementModificationException placementModificationException =
+ new PlacementModificationException("delete replica(s) rejected");
+ for (Map.Entry<Node, List<Replica>> entry : nodesRepresented.entrySet()) {
+ WeightedNode node = weightedNodes.get(entry.getKey());
+ if (node == null) {
+ entry
+ .getValue()
+ .forEach(
+ replica ->
+ placementModificationException.addRejectedModification(
+ replica.toString(),
+ "could not load information for node: " + entry.getKey().getName()));
+ } else {
+ node.canRemoveReplicas(entry.getValue())
+ .forEach(
+ (replica, reason) ->
+ placementModificationException.addRejectedModification(
+ replica.toString(), reason));
+ }
+ }
+ if (!placementModificationException.getRejectedModifications().isEmpty()) {
+ throw placementModificationException;
+ }
+ }
+
+ /**
+ * A class that determines the weight of a given node and the replicas that reside on it.
+ *
+ * <p>The {@link OrderedNodePlacementPlugin} uses the weights determined here to place and balance
+ * replicas across the cluster. Replicas will be placed onto WeightedNodes with lower weights, and
+ * be taken off of WeightedNodes with higher weights.
+ *
+ * @lucene.experimental
+ */
+ public abstract static class WeightedNode implements Comparable<WeightedNode> {
+ private final Node node;
+ private final Map<String, Map<String, Set<Replica>>> replicas;
+ private IntSupplier sortWeightCalculator;
+ private int lastSortedWeight;
+
+ public WeightedNode(Node node) {
+ this.node = node;
+ this.replicas = new HashMap<>();
+ this.lastSortedWeight = 0;
+ this.sortWeightCalculator = this::calcWeight;
+ }
+
+ public void sortByRelevantWeightWithReplica(Replica replica) {
+ sortWeightCalculator = () -> calcRelevantWeightWithReplica(replica);
+ }
+
+ public void sortWithoutChanges() {
+ sortWeightCalculator = this::calcWeight;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public Set<Replica> getAllReplicasOnNode() {
+ return replicas.values().stream()
+ .flatMap(shard -> shard.values().stream())
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ }
+
+ public Set<String> getCollectionsOnNode() {
+ return replicas.keySet();
+ }
+
+ public boolean hasCollectionOnNode(String collection) {
+ return replicas.containsKey(collection);
+ }
+
+ public Set<String> getShardsOnNode(String collection) {
+ return replicas.getOrDefault(collection, Collections.emptyMap()).keySet();
+ }
+
+ public boolean hasShardOnNode(Shard shard) {
+ return replicas
+ .getOrDefault(shard.getCollection().getName(), Collections.emptyMap())
+ .containsKey(shard.getShardName());
+ }
+
+ public Set<Replica> getReplicasForShardOnNode(Shard shard) {
+ return Optional.ofNullable(replicas.get(shard.getCollection().getName()))
+ .map(m -> m.get(shard.getShardName()))
+ .orElseGet(Collections::emptySet);
+ }
+
+ public void addToSortedCollection(Collection<WeightedNode> collection) {
+ stashSortedWeight();
+ collection.add(this);
+ }
+
+ public abstract int calcWeight();
+
+ public abstract int calcRelevantWeightWithReplica(Replica replica);
+
+ public boolean canAddReplica(Replica replica) {
+ // By default, do not allow two replicas of the same shard on a node
+ return getReplicasForShardOnNode(replica.getShard()).isEmpty();
+ }
+
+ private boolean addReplicaToInternalState(Replica replica) {
+ return replicas
+ .computeIfAbsent(replica.getShard().getCollection().getName(), k -> new HashMap<>())
+ .computeIfAbsent(replica.getShard().getShardName(), k -> CollectionUtil.newHashSet(1))
+ .add(replica);
+ }
+
+ public final void initReplica(Replica replica) {
+ if (addReplicaToInternalState(replica)) {
+ initReplicaWeights(replica);
+ }
+ }
+
+ protected void initReplicaWeights(Replica replica) {
+ // Defaults to a NO-OP
+ }
+
+ public final boolean addReplica(Replica replica) {
+ if (addReplicaToInternalState(replica)) {
+ return addProjectedReplicaWeights(replica);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Add the weights for the given replica to this node
+ *
+ * @param replica the replica to add weights for
+ * @return a whether the ordered list of nodes needs a resort afterwords.
+ */
+ protected abstract boolean addProjectedReplicaWeights(Replica replica);
+
+ /**
+ * Determine if the given replicas can be removed from the node.
+ *
+ * @param replicas the replicas to remove
+ * @return a mapping from replicas that cannot be removed to the reason why they can't be
+ * removed.
+ */
+ public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
+ return Collections.emptyMap();
+ }
+
+ public final void removeReplica(Replica replica) {
+ // Only remove the projected replicaWeight if the node has this replica
+ AtomicBoolean hasReplica = new AtomicBoolean(false);
+ replicas.computeIfPresent(
+ replica.getShard().getCollection().getName(),
+ (col, shardReps) -> {
+ shardReps.computeIfPresent(
+ replica.getShard().getShardName(),
+ (shard, reps) -> {
+ if (reps.remove(replica)) {
+ hasReplica.set(true);
+ }
+ return reps.isEmpty() ? null : reps;
+ });
+ return shardReps.isEmpty() ? null : shardReps;
+ });
+ if (hasReplica.get()) {
+ removeProjectedReplicaWeights(replica);
+ }
+ }
+
+ protected abstract void removeProjectedReplicaWeights(Replica replica);
+
+ private void stashSortedWeight() {
+ lastSortedWeight = sortWeightCalculator.getAsInt();
+ }
+
+ protected boolean hasWeightChangedSinceSort() {
+ return lastSortedWeight != sortWeightCalculator.getAsInt();
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ protected Comparable getTiebreaker() {
+ return node.getName();
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked"})
+ public int compareTo(WeightedNode o) {
+ int comp = Integer.compare(this.lastSortedWeight, o.lastSortedWeight);
+ if (comp == 0 && !equals(o)) {
+ // TreeSets do not like a 0 comp for non-equal members.
+ comp = getTiebreaker().compareTo(o.getTiebreaker());
+ }
+ return comp;
+ }
+
+ @Override
+ public int hashCode() {
+ return node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof WeightedNode)) {
+ return false;
+ } else {
+ WeightedNode on = (WeightedNode) o;
+ if (this.node == null) {
+ return on.node == null;
+ } else {
+ return this.node.equals(on.node);
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a fake replica to be used when computing placements for new Replicas. The new replicas
+ * need to be added to the projected state, even though they don't exist.
+ *
+ * @param collection the existing collection that the replica will be created for
+ * @param shardName the name of the new replica's shard
+ * @param type the ReplicaType for the new replica
+ * @param node the Solr node on which the replica will be placed
+ * @return a fake replica to use until the new replica is created
+ */
+ static Replica createProjectedReplica(
+ final SolrCollection collection,
+ final String shardName,
+ final Replica.ReplicaType type,
+ final Node node) {
+ final Shard shard =
+ new Shard() {
+ @Override
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public SolrCollection getCollection() {
+ return collection;
+ }
+
+ @Override
+ public Replica getReplica(String name) {
+ return null;
+ }
+
+ @Override
+ public Iterator<Replica> iterator() {
+ return null;
+ }
+
+ @Override
+ public Iterable<Replica> replicas() {
+ return null;
+ }
+
+ @Override
+ public Replica getLeader() {
+ return null;
+ }
+
+ @Override
+ public ShardState getState() {
+ return null;
+ }
+ };
+ return new Replica() {
+ @Override
+ public Shard getShard() {
+ return shard;
+ }
+
+ @Override
+ public ReplicaType getType() {
+ return type;
+ }
+
+ @Override
+ public ReplicaState getState() {
+ return ReplicaState.DOWN;
+ }
+
+ @Override
+ public String getReplicaName() {
+ return "";
+ }
+
+ @Override
+ public String getCoreName() {
+ return "";
+ }
+
+ @Override
+ public Node getNode() {
+ return node;
+ }
+ };
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
index 47f76a03a65..1e0f6a2f5ba 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/RandomPlacementFactory.java
@@ -17,30 +17,25 @@
package org.apache.solr.cluster.placement.plugins;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.PlacementContext;
-import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
-import org.apache.solr.cluster.placement.PlacementPlanFactory;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
-import org.apache.solr.common.util.CollectionUtil;
/**
* Factory for creating {@link RandomPlacementPlugin}, a placement plugin implementing random
* placement for new collection creation while preventing two replicas of same shard from being
* placed on same node..
*
+ * <p>See {@link RandomNode} for information on how this PlacementFactory weights nodes.
+ *
* <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
*/
public class RandomPlacementFactory
@@ -51,7 +46,7 @@ public class RandomPlacementFactory
return new RandomPlacementPlugin();
}
- public static class RandomPlacementPlugin implements PlacementPlugin {
+ public static class RandomPlacementPlugin extends OrderedNodePlacementPlugin {
private final Random replicaPlacementRandom =
new Random(); // ok even if random sequence is predictable.
@@ -64,65 +59,69 @@ public class RandomPlacementFactory
}
@Override
- public List<PlacementPlan> computePlacements(
- Collection<PlacementRequest> requests, PlacementContext placementContext)
- throws PlacementException {
- List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
- for (PlacementRequest request : requests) {
- int totalReplicasPerShard = 0;
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- totalReplicasPerShard += request.getCountReplicasToCreate(rt);
- }
-
- if (request.getTargetNodes().size() < totalReplicasPerShard) {
- throw new PlacementException("Cluster size too small for number of replicas per shard");
- }
-
- Set<ReplicaPlacement> replicaPlacements =
- CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
-
- // Now place randomly all replicas of all shards on available nodes
- for (String shardName : request.getShardNames()) {
- // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct
- // yet random nodes
- ArrayList<Node> nodesToAssign = new ArrayList<>(request.getTargetNodes());
- Collections.shuffle(nodesToAssign, replicaPlacementRandom);
-
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- placeForReplicaType(
- request.getCollection(),
- nodesToAssign,
- placementContext.getPlacementPlanFactory(),
- replicaPlacements,
- shardName,
- request,
- rt);
- }
- }
-
- placementPlans.add(
- placementContext
- .getPlacementPlanFactory()
- .createPlacementPlan(request, replicaPlacements));
+ protected Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors) {
+ HashMap<Node, WeightedNode> nodeMap = new HashMap<>();
+
+ for (Node node : nodes) {
+ nodeMap.put(node, new RandomNode(node, replicaPlacementRandom));
}
- return placementPlans;
+
+ return nodeMap;
}
+ }
- private void placeForReplicaType(
- SolrCollection solrCollection,
- ArrayList<Node> nodesToAssign,
- PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements,
- String shardName,
- PlacementRequest request,
- Replica.ReplicaType replicaType) {
- for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
- Node node = nodesToAssign.remove(0);
-
- replicaPlacements.add(
- placementPlanFactory.createReplicaPlacement(
- solrCollection, shardName, node, replicaType));
- }
+ /**
+ * This implementation weights nodes equally. When trying to determine which nodes should be
+ * chosen to host replicas, a random sorting is used.
+ *
+ * <p>Multiple replicas of the same shard are not permitted to live on the same Node.
+ */
+ private static class RandomNode extends OrderedNodePlacementPlugin.WeightedNode {
+ private final Random random;
+ private int randomTiebreaker;
+
+ public RandomNode(Node node, Random random) {
+ super(node);
+ this.random = random;
+ this.randomTiebreaker = random.nextInt();
+ }
+
+ @Override
+ public int calcWeight() {
+ return 0;
+ }
+
+ @Override
+ @SuppressWarnings({"rawtypes"})
+ public Comparable getTiebreaker() {
+ return randomTiebreaker;
+ }
+
+ @Override
+ public int calcRelevantWeightWithReplica(Replica replica) {
+ return calcWeight();
+ }
+
+ @Override
+ protected boolean addProjectedReplicaWeights(Replica replica) {
+ // NO-OP
+ return false;
+ }
+
+ @Override
+ protected void removeProjectedReplicaWeights(Replica replica) {
+ // NO-OP
+ }
+
+ @Override
+ public void addToSortedCollection(
+ Collection<OrderedNodePlacementPlugin.WeightedNode> collection) {
+ randomTiebreaker = random.nextInt();
+ super.addToSortedCollection(collection);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
index 3242d25aa6c..52f30c367b0 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java
@@ -17,32 +17,24 @@
package org.apache.solr.cluster.placement.plugins;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.PlacementContext;
-import org.apache.solr.cluster.placement.PlacementException;
-import org.apache.solr.cluster.placement.PlacementPlan;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.PlacementRequest;
-import org.apache.solr.cluster.placement.ReplicaPlacement;
-import org.apache.solr.common.util.CollectionUtil;
/**
* Factory for creating {@link SimplePlacementPlugin}, a placement plugin implementing the logic
* from the old <code>LegacyAssignStrategy</code>. This chooses nodes with the fewest cores
* (especially cores of the same collection).
*
+ * <p>See {@link SameCollWeightedNode} for information on how this PlacementFactory weights nodes.
+ *
* <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.
*/
public class SimplePlacementFactory
@@ -53,118 +45,140 @@ public class SimplePlacementFactory
return new SimplePlacementPlugin();
}
- public static class SimplePlacementPlugin implements PlacementPlugin {
- @Override
- public List<PlacementPlan> computePlacements(
- Collection<PlacementRequest> requests, PlacementContext placementContext)
- throws PlacementException {
- List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
- Map<Node, ReplicaCount> nodeVsShardCount = getNodeVsShardCount(placementContext);
- for (PlacementRequest request : requests) {
- int totalReplicasPerShard = 0;
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- totalReplicasPerShard += request.getCountReplicasToCreate(rt);
- }
-
- Set<ReplicaPlacement> replicaPlacements =
- CollectionUtil.newHashSet(totalReplicasPerShard * request.getShardNames().size());
-
- Collection<ReplicaCount> replicaCounts = nodeVsShardCount.values();
-
- if (request.getTargetNodes().size() < replicaCounts.size()) {
- replicaCounts =
- replicaCounts.stream()
- .filter(rc -> request.getTargetNodes().contains(rc.node()))
- .collect(Collectors.toList());
- }
-
- for (String shard : request.getShardNames()) {
- // Reset the ordering of the nodes for each shard, using the replicas added in the
- // previous shards and assign requests
- List<Node> nodeList =
- replicaCounts.stream()
- .sorted(
- Comparator.<ReplicaCount>comparingInt(
- rc -> rc.weight(request.getCollection().getName()))
- .thenComparing(ReplicaCount::nodeName))
- .map(ReplicaCount::node)
- .collect(Collectors.toList());
- int replicaNumOfShard = 0;
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- for (int i = 0; i < request.getCountReplicasToCreate(replicaType); i++) {
- Node assignedNode = nodeList.get(replicaNumOfShard++ % nodeList.size());
-
- replicaPlacements.add(
- placementContext
- .getPlacementPlanFactory()
- .createReplicaPlacement(
- request.getCollection(), shard, assignedNode, replicaType));
-
- ReplicaCount replicaCount =
- nodeVsShardCount.computeIfAbsent(assignedNode, ReplicaCount::new);
- replicaCount.totalReplicas++;
- replicaCount.collectionReplicas.merge(
- request.getCollection().getName(), 1, Integer::sum);
- }
- }
- }
-
- placementPlans.add(
- placementContext
- .getPlacementPlanFactory()
- .createPlacementPlan(request, replicaPlacements));
- }
- return placementPlans;
- }
-
- private Map<Node, ReplicaCount> getNodeVsShardCount(PlacementContext placementContext) {
- HashMap<Node, ReplicaCount> nodeVsShardCount = new HashMap<>();
+ public static class SimplePlacementPlugin extends OrderedNodePlacementPlugin {
- for (Node s : placementContext.getCluster().getLiveDataNodes()) {
- nodeVsShardCount.computeIfAbsent(s, ReplicaCount::new);
+ @Override
+ protected Map<Node, WeightedNode> getBaseWeightedNodes(
+ PlacementContext placementContext,
+ Set<Node> nodes,
+ Iterable<SolrCollection> relevantCollections,
+ boolean skipNodesWithErrors) {
+ HashMap<Node, WeightedNode> nodeVsShardCount = new HashMap<>();
+
+ for (Node n : nodes) {
+ nodeVsShardCount.computeIfAbsent(n, SameCollWeightedNode::new);
}
- // if we get here we were not given a createNodeList, build a map with real counts.
- for (SolrCollection collection : placementContext.getCluster().collections()) {
- // identify suitable nodes by checking the no:of cores in each of them
- for (Shard shard : collection.shards()) {
- for (Replica replica : shard.replicas()) {
- ReplicaCount count = nodeVsShardCount.get(replica.getNode());
- if (count != null) {
- count.addReplica(collection.getName(), shard.getShardName());
- }
- }
- }
- }
return nodeVsShardCount;
}
}
- static class ReplicaCount {
- public final Node node;
+ /**
+ * This implementation weights nodes according to how many replicas of the same collection and
+ * shard reside on the node. The implementation tries to spread replicas of the same
+ * collection/shard across different nodes, so nodes that contain more of the same
+ * collection/shard will be weighted higher than nodes that only contain replicas for unique
+ * collections/shards.
+ *
+ * <p>The total weight of the SameCollWeightedNode is the sum of:
+ *
+ * <ul>
+ * <li>The number of replicas on the node
+ * <li>5 * for each collection, the sum of:
+ * <ul>
+ * <li>(the number of replicas for that collection - 1)^2
+ * </ul>
+ * <li>1000 * for each shard, the sum of:
+ * <ul>
+ * <li>(the number of replicas for that shard - 1)^2
+ * </ul>
+ * </ul>
+ *
+ * The count of overlapping replicas for collections/shards must be squared, since we want higher
+ * values to be penalized more than lower values. If a node has 2 collections with 3 replicas
+ * each, it should be weighted less than a node with 1 collection that has 5 replicas placed
+ * there. Without squaring, the weight for the first node would be 26, and the weight of the
+ * second node would be 25. So node #2 would be weighted lower even though it is considered to be
+ * violating the constraints more. When we square the overlapping replica counts, the weight of
+ * the first node would be 46 and the weight of the second node would be 85. This is the preferred
+ * order.
+ *
+ * <p>The "relevant" weight with a replica is the sum of:
+ *
+ * <ul>
+ * <li>The number of replicas on the node
+ * <li>5 * (the number of replicas on the node for that replica's collection - 1)
+ * <li>1000 * (the number of replicas on the node for that replica's shard - 1)
+ * </ul>
+ *
+ * <p>Multiple replicas of the same shard are permitted to live on the same Node, but as shown
+ * above, the weight penalty for such is very high.
+ */
+ private static class SameCollWeightedNode extends OrderedNodePlacementPlugin.WeightedNode {
+ private static final int SAME_COL_MULT = 5;
+ private static final int SAME_SHARD_MULT = 1000;
public Map<String, Integer> collectionReplicas;
- public int totalReplicas = 0;
+ public int totalWeight = 0;
- ReplicaCount(Node node) {
- this.node = node;
+ SameCollWeightedNode(Node node) {
+ super(node);
this.collectionReplicas = new HashMap<>();
}
- public int weight(String collection) {
- return (collectionReplicas.getOrDefault(collection, 0) * 5) + totalReplicas;
+ @Override
+ public int calcWeight() {
+ return totalWeight;
}
- public void addReplica(String collection, String shard) {
- // Used to "weigh" whether this node should be used later.
- collectionReplicas.merge(collection, 1, Integer::sum);
+ @Override
+ public int calcRelevantWeightWithReplica(Replica replica) {
+ // Don't add 1 to the individual replica Counts, because 1 is subtracted from each when
+ // calculating weights.
+ // So since 1 would be added to each for the new replica, we can just use the original number
+ // to calculate the weights.
+ int colReplicaCount =
+ collectionReplicas.getOrDefault(replica.getShard().getCollection().getName(), 0);
+ int shardReplicaCount = getReplicasForShardOnNode(replica.getShard()).size();
+ return getAllReplicasOnNode().size()
+ + 1
+ + colReplicaCount * SAME_COL_MULT
+ + shardReplicaCount * SAME_SHARD_MULT;
}
- public Node node() {
- return node;
+ @Override
+ public boolean canAddReplica(Replica replica) {
+ return true;
}
- public String nodeName() {
- return node.getName();
+ @Override
+ protected boolean addProjectedReplicaWeights(Replica replica) {
+ int colReplicaCountWith =
+ collectionReplicas.merge(replica.getShard().getCollection().getName(), 1, Integer::sum);
+ int shardReplicaCountWith = getReplicasForShardOnNode(replica.getShard()).size();
+ totalWeight +=
+ addedWeightOfAdditionalReplica(colReplicaCountWith - 1, shardReplicaCountWith - 1);
+ return false;
+ }
+
+ @Override
+ protected void initReplicaWeights(Replica replica) {
+ addProjectedReplicaWeights(replica);
+ }
+
+ @Override
+ protected void removeProjectedReplicaWeights(Replica replica) {
+ Integer colReplicaCountWithout =
+ Optional.ofNullable(
+ collectionReplicas.computeIfPresent(
+ replica.getShard().getCollection().getName(), (k, v) -> v - 1))
+ .orElse(0);
+ int shardReplicaCountWithout = getReplicasForShardOnNode(replica.getShard()).size();
+ totalWeight -=
+ addedWeightOfAdditionalReplica(colReplicaCountWithout, shardReplicaCountWithout);
+ }
+
+ private int addedWeightOfAdditionalReplica(
+ int colReplicaCountWithout, int shardReplicaCountWithout) {
+ int additionalWeight = 1;
+ if (colReplicaCountWithout > 0) {
+ // x * 2 - 1 === x^2 - (x - 1)^2
+ additionalWeight += SAME_COL_MULT * (colReplicaCountWithout * 2 - 1);
+ }
+ if (shardReplicaCountWithout > 0) {
+ // x * 2 - 1 === x^2 - (x - 1)^2
+ additionalWeight += SAME_SHARD_MULT * (colReplicaCountWithout * 2 - 1);
+ }
+ return additionalWeight;
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6659b65e3aa..9413389ca92 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -163,6 +163,7 @@ import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.api.AddReplicaPropertyAPI;
import org.apache.solr.handler.admin.api.AdminAPIBase;
import org.apache.solr.handler.admin.api.AliasPropertyAPI;
+import org.apache.solr.handler.admin.api.BalanceReplicasAPI;
import org.apache.solr.handler.admin.api.BalanceShardUniqueAPI;
import org.apache.solr.handler.admin.api.CollectionPropertyAPI;
import org.apache.solr.handler.admin.api.CollectionStatusAPI;
@@ -1381,6 +1382,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
ReloadCollectionAPI.class,
RenameCollectionAPI.class,
ReplaceNodeAPI.class,
+ BalanceReplicasAPI.class,
RestoreCollectionAPI.class,
SyncShardAPI.class,
CollectionPropertyAPI.class,
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java b/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java
new file mode 100644
index 00000000000..0a92fcea962
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/BalanceReplicasAPI.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.admin.api;
+
+import static org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.params.CollectionParams.NODES;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+import static org.apache.solr.handler.admin.CollectionsHandler.DEFAULT_COLLECTION_OP_TIMEOUT;
+import static org.apache.solr.security.PermissionNameProvider.Name.COLL_EDIT_PERM;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.parameters.RequestBody;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.jersey.JacksonReflectMapWriter;
+import org.apache.solr.jersey.PermissionName;
+import org.apache.solr.jersey.SolrJerseyResponse;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+/** V2 API for balancing the replicas that already exist across a set of nodes. */
+@Path("cluster/replicas/balance")
+public class BalanceReplicasAPI extends AdminAPIBase {
+
+ @Inject
+ public BalanceReplicasAPI(
+ CoreContainer coreContainer,
+ SolrQueryRequest solrQueryRequest,
+ SolrQueryResponse solrQueryResponse) {
+ super(coreContainer, solrQueryRequest, solrQueryResponse);
+ }
+
+ @POST
+ @Produces({"application/json", "application/xml", BINARY_CONTENT_TYPE_V2})
+ @PermissionName(COLL_EDIT_PERM)
+ @Operation(summary = "Balance Replicas across the given set of Nodes.")
+ public SolrJerseyResponse balanceReplicas(
+ @RequestBody(description = "Contains user provided parameters")
+ BalanceReplicasRequestBody requestBody)
+ throws Exception {
+ final SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class);
+ final CoreContainer coreContainer = fetchAndValidateZooKeeperAwareCoreContainer();
+ // TODO Record node for log and tracing
+ final ZkNodeProps remoteMessage = createRemoteMessage(requestBody);
+ final SolrResponse remoteResponse =
+ CollectionsHandler.submitCollectionApiCommand(
+ coreContainer,
+ coreContainer.getDistributedCollectionCommandRunner(),
+ remoteMessage,
+ CollectionAction.BALANCE_REPLICAS,
+ DEFAULT_COLLECTION_OP_TIMEOUT);
+ if (remoteResponse.getException() != null) {
+ throw remoteResponse.getException();
+ }
+
+ disableResponseCaching();
+ return response;
+ }
+
+ public ZkNodeProps createRemoteMessage(BalanceReplicasRequestBody requestBody) {
+ final Map<String, Object> remoteMessage = new HashMap<>();
+ if (requestBody != null) {
+ insertIfNotNull(remoteMessage, NODES, requestBody.nodes);
+ insertIfNotNull(remoteMessage, WAIT_FOR_FINAL_STATE, requestBody.waitForFinalState);
+ insertIfNotNull(remoteMessage, ASYNC, requestBody.async);
+ }
+ remoteMessage.put(QUEUE_OPERATION, CollectionAction.BALANCE_REPLICAS.toLower());
+
+ return new ZkNodeProps(remoteMessage);
+ }
+
+ public static class BalanceReplicasRequestBody implements JacksonReflectMapWriter {
+
+ public BalanceReplicasRequestBody() {}
+
+ public BalanceReplicasRequestBody(Set<String> nodes, Boolean waitForFinalState, String async) {
+ this.nodes = nodes;
+ this.waitForFinalState = waitForFinalState;
+ this.async = async;
+ }
+
+ @Schema(
+ description =
+ "The set of nodes across which replicas will be balanced. Defaults to all live data nodes.")
+ @JsonProperty(value = "nodes")
+ public Set<String> nodes;
+
+ @Schema(
+ description =
+ "If true, the request will complete only when all affected replicas become active. "
+ + "If false, the API will return the status of the single action, which may be "
+ + "before the new replica is online and active.")
+ @JsonProperty("waitForFinalState")
+ public Boolean waitForFinalState = false;
+
+ @Schema(description = "Request ID to track this action which will be processed asynchronously.")
+ @JsonProperty("async")
+ public String async;
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java
new file mode 100644
index 00000000000..82a75027e68
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/BalanceReplicasTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.admin.api.BalanceReplicasAPI;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BalanceReplicasTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void setupCluster() {
+ System.setProperty("metricsEnabled", "true");
+ }
+
+ @Before
+ public void clearPreviousCluster() throws Exception {
+ // Clear the previous cluster before each test, since they use different numbers of nodes.
+ shutdownCluster();
+ }
+
+ @Test
+ public void testAllNodes() throws Exception {
+ configureCluster(6)
+ .addConfig(
+ "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
+ String coll = "balancereplicastest_allnodes_coll";
+ if (log.isInfoEnabled()) {
+ log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+ }
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+ ArrayList<String> l = new ArrayList<>(liveNodes);
+ Collections.shuffle(l, random());
+ CollectionAdminRequest.Create create;
+
+ create =
+ pickRandom(
+ CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0),
+ // check also replicationFactor 1
+ CollectionAdminRequest.createCollection(coll, "conf1", 6, 1, 0, 0));
+ create.setCreateNodeSet(StrUtils.join(l.subList(0, 2), ','));
+ cloudClient.request(create);
+
+ cluster.waitForActiveCollection(
+ coll,
+ create.getNumShards(),
+ create.getNumShards()
+ * (create.getNumNrtReplicas()
+ + create.getNumPullReplicas()
+ + create.getNumTlogReplicas()));
+
+ DocCollection collection = cloudClient.getClusterState().getCollection(coll);
+ log.debug("### Before balancing: {}", collection);
+
+ postDataAndGetResponse(
+ cluster.getSolrClient(),
+ "/api/cluster/replicas/balance",
+ BalanceReplicasAPI.BalanceReplicasRequestBody.EMPTY);
+
+ collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+ log.debug("### After balancing: {}", collection);
+ Set<String> replicaNodes =
+ collection.getReplicas().stream().map(Replica::getNodeName).collect(Collectors.toSet());
+ assertEquals("Incorrect nodes for replicas after balancing", liveNodes, replicaNodes);
+ }
+
+ @Test
+ public void testSomeNodes() throws Exception {
+ configureCluster(5)
+ .addConfig(
+ "conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
+ String coll = "balancereplicastest_somenodes_coll";
+ if (log.isInfoEnabled()) {
+ log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+ }
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ Set<String> liveNodes = cloudClient.getClusterState().getLiveNodes();
+ ArrayList<String> l = new ArrayList<>(liveNodes);
+ Collections.shuffle(l, random());
+ CollectionAdminRequest.Create create;
+
+ create =
+ pickRandom(
+ CollectionAdminRequest.createCollection(coll, "conf1", 3, 2, 0, 0),
+ // check also replicationFactor 1
+ CollectionAdminRequest.createCollection(coll, "conf1", 6, 1, 0, 0));
+ create.setCreateNodeSet(StrUtils.join(l.subList(0, 2), ','));
+ cloudClient.request(create);
+
+ cluster.waitForActiveCollection(
+ coll,
+ create.getNumShards(),
+ create.getNumShards()
+ * (create.getNumNrtReplicas()
+ + create.getNumPullReplicas()
+ + create.getNumTlogReplicas()));
+
+ DocCollection collection = cloudClient.getClusterState().getCollection(coll);
+ log.debug("### Before balancing: {}", collection);
+
+ postDataAndGetResponse(
+ cluster.getSolrClient(),
+ "/api/cluster/replicas/balance",
+ new BalanceReplicasAPI.BalanceReplicasRequestBody(
+ new HashSet<>(l.subList(1, 4)), true, null));
+
+ collection = cloudClient.getClusterState().getCollectionOrNull(coll, false);
+ log.debug("### After balancing: {}", collection);
+ Set<String> replicaNodes =
+ collection.getReplicas().stream().map(Replica::getNodeName).collect(Collectors.toSet());
+ assertEquals("Incorrect nodes for replicas after balancing", 4, replicaNodes.size());
+ assertTrue(
+ "A non-balanced node lost replicas during balancing", replicaNodes.contains(l.get(0)));
+ assertFalse(
+ "A non-balanced node gained replicas during balancing", replicaNodes.contains(l.get(4)));
+ }
+
+ public Map<?, ?> postDataAndGetResponse(CloudSolrClient cloudClient, String uri, Object body)
+ throws IOException {
+ HttpEntityEnclosingRequestBase httpRequest = null;
+ HttpEntity entity;
+ String response = null;
+ Map<?, ?> m = null;
+
+ uri = cluster.getJettySolrRunners().get(0).getBaseUrl().toString().replace("/solr", "") + uri;
+ try {
+ httpRequest = new HttpPost(uri);
+
+ httpRequest.setEntity(new ByteArrayEntity(Utils.toJSON(body), ContentType.APPLICATION_JSON));
+ httpRequest.setHeader("Accept", "application/json");
+ entity =
+ ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpRequest).getEntity();
+ try {
+ response = EntityUtils.toString(entity, UTF_8);
+ m = (Map<?, ?>) Utils.fromJSONString(response);
+ } catch (JSONParser.ParseException e) {
+ log.error("err response: {}", response);
+ throw new AssertionError(e);
+ }
+ } finally {
+ httpRequest.releaseConnection();
+ }
+ return m;
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
index fb7d769d1c9..ecbf3fb5de4 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
@@ -335,7 +335,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.process(cluster.getSolrClient());
fail("should have failed: " + rsp);
} catch (Exception e) {
- assertTrue(e.toString(), e.toString().contains("colocated collection"));
+ assertTrue(e.toString(), e.toString().contains("collocated collection"));
}
}
@@ -383,8 +383,8 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
fail("should have failed due to no nodes with the types: " + rsp);
} catch (Exception e) {
assertTrue(
- "should contain 'no nodes with types':" + e,
- e.toString().contains("no nodes with types"));
+ "should contain 'Not enough eligible nodes to place':" + e,
+ e.toString().contains("Not enough eligible nodes to place"));
}
System.setProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP, "type_0");
CollectionAdminResponse rsp =
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java
new file mode 100644
index 00000000000..7e03b52aad1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AbstractPlacementFactoryTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.Builders;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+
+/** Unit test for {@link AffinityPlacementFactory} */
+public abstract class AbstractPlacementFactoryTest extends SolrTestCaseJ4 {
+
+ /**
+ * Verifies that a computed set of placements does match the expected placement on nodes.
+ *
+ * @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the
+ * shard index, NRT the replica type and 3 the node on which the replica is placed. Shards are
+ * 1-based. Nodes 0-based.
+ * <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
+ * shards as defined for the collection in the call to {@code
+ * Builders.CollectionBuilder#customCollectionSetup(List, List)}
+ * @param shardBuilders the shard builders are passed here to get the shard names by index
+ * (1-based) rather than by parsing the shard names (which would break if we change the shard
+ * naming scheme).
+ */
+ public static void verifyPlacements(
+ Set<String> expectedPlacements,
+ PlacementPlan placementPlan,
+ List<Builders.ShardBuilder> shardBuilders,
+ List<Node> liveNodes) {
+ Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
+
+ // Prepare structures for looking up shard name index and node index
+ Map<String, Integer> shardNumbering = new HashMap<>();
+ int index = 1; // first shard is 1 not 0
+ for (Builders.ShardBuilder sb : shardBuilders) {
+ shardNumbering.put(sb.getShardName(), index++);
+ }
+ Map<Node, Integer> nodeNumbering = new HashMap<>();
+ index = 0;
+ for (Node n : liveNodes) {
+ nodeNumbering.put(n, index++);
+ }
+
+ if (expectedPlacements.size() != computedPlacements.size()) {
+ fail(
+ "Wrong number of placements, expected "
+ + expectedPlacements.size()
+ + " computed "
+ + computedPlacements.size()
+ + ". "
+ + getExpectedVsComputedPlacement(
+ expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+ }
+
+ Set<String> expected = new HashSet<>(expectedPlacements);
+ for (ReplicaPlacement p : computedPlacements) {
+ String lookUpPlacementResult =
+ shardNumbering.get(p.getShardName())
+ + " "
+ + p.getReplicaType().name()
+ + " "
+ + nodeNumbering.get(p.getNode());
+ if (!expected.remove(lookUpPlacementResult)) {
+ fail(
+ "Computed placement ["
+ + lookUpPlacementResult
+ + "] not expected. "
+ + getExpectedVsComputedPlacement(
+ expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+ }
+ }
+ }
+
+ private static String getExpectedVsComputedPlacement(
+ Set<String> expectedPlacements,
+ Set<ReplicaPlacement> computedPlacements,
+ Map<String, Integer> shardNumbering,
+ Map<Node, Integer> nodeNumbering) {
+
+ StringBuilder sb = new StringBuilder("Expected placement: ");
+ for (String placement : expectedPlacements) {
+ sb.append("[").append(placement).append("] ");
+ }
+
+ sb.append("Computed placement: ");
+ for (ReplicaPlacement placement : computedPlacements) {
+ String lookUpPlacementResult =
+ shardNumbering.get(placement.getShardName())
+ + " "
+ + placement.getReplicaType().name()
+ + " "
+ + nodeNumbering.get(placement.getNode());
+
+ sb.append("[").append(lookUpPlacementResult).append("] ");
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Verifies that a computed set of placements does match the expected placement on nodes.
+ *
+ * @param expectedMovements a set of strings of the form {@code "COL 1 NRT 3 -> 4"} where COL is
+ * the name of the collection, 1 would be the shard index, NRT the replica type and {@code 3
+ * -> 4} represents the replica moving from the 3rd node to the 4th node. Shards are 1-based.
+ * Nodes 0-based.
+ * <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
+ * shards as defined for the collection in the call to {@code
+ * Builders.CollectionBuilder#customCollectionSetup(List, List)}
+ * @param shardBuilders the shard builders are passed here to get the shard names by index
+ * (1-based) rather than by parsing the shard names (which would break if we change the shard
+ * naming scheme).
+ */
+ public static void verifyBalancing(
+ Set<String> expectedMovements,
+ BalancePlan balancePlan,
+ List<Builders.ShardBuilder> shardBuilders,
+ List<Node> liveNodes) {
+ Map<Replica, Node> computedMovements = balancePlan.getReplicaMovements();
+
+ assertNotNull(
+ "Replica movements returned from balancePlan should not be null", computedMovements);
+
+ // Prepare structures for looking up shard name index and node index
+ Map<String, Integer> shardNumbering = new HashMap<>();
+ int index = 1; // first shard is 1 not 0
+ for (Builders.ShardBuilder sb : shardBuilders) {
+ shardNumbering.put(sb.getShardName(), index++);
+ }
+ Map<Node, Integer> nodeNumbering = new HashMap<>();
+ index = 0;
+ for (Node n : liveNodes) {
+ nodeNumbering.put(n, index++);
+ }
+
+ if (expectedMovements.size() != computedMovements.size()) {
+ fail(
+ "Wrong number of replica movements, expected "
+ + expectedMovements.size()
+ + " computed "
+ + computedMovements.size()
+ + ". "
+ + getExpectedVsComputedMovement(
+ expectedMovements, computedMovements, shardNumbering, nodeNumbering));
+ }
+
+ Set<String> expected = new HashSet<>(expectedMovements);
+ for (Map.Entry<Replica, Node> movement : computedMovements.entrySet()) {
+ Replica replica = movement.getKey();
+ String lookUpMovementResult =
+ replica.getShard().getCollection().getName()
+ + " "
+ + shardNumbering.get(replica.getShard().getShardName())
+ + " "
+ + replica.getType().name()
+ + " "
+ + nodeNumbering.get(replica.getNode())
+ + " -> "
+ + nodeNumbering.get(movement.getValue());
+ if (!expected.remove(lookUpMovementResult)) {
+ fail(
+ "Computed placement ["
+ + lookUpMovementResult
+ + "] not expected. "
+ + getExpectedVsComputedMovement(
+ expectedMovements, computedMovements, shardNumbering, nodeNumbering));
+ }
+ }
+ }
+
+ private static String getExpectedVsComputedMovement(
+ Set<String> expectedMovements,
+ Map<Replica, Node> computedMovements,
+ Map<String, Integer> shardNumbering,
+ Map<Node, Integer> nodeNumbering) {
+
+ StringBuilder sb = new StringBuilder("Expected movement: ");
+ for (String movement : expectedMovements) {
+ sb.append("[").append(movement).append("] ");
+ }
+
+ sb.append("Computed movement: ");
+ for (Map.Entry<Replica, Node> movement : computedMovements.entrySet()) {
+ Replica replica = movement.getKey();
+ String lookUpMovementResult =
+ replica.getShard().getCollection().getName()
+ + " "
+ + shardNumbering.get(replica.getShard().getShardName())
+ + " "
+ + replica.getType().name()
+ + " "
+ + nodeNumbering.get(replica.getNode())
+ + " -> "
+ + nodeNumbering.get(movement.getValue());
+
+ sb.append("[").append(lookUpMovementResult).append("] ");
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
index 7cbf0448571..c71c823e59b 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -18,8 +18,7 @@
package org.apache.solr.cluster.placement.plugins;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,20 +30,22 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.AttributeValues;
+import org.apache.solr.cluster.placement.BalancePlan;
import org.apache.solr.cluster.placement.Builders;
import org.apache.solr.cluster.placement.DeleteReplicasRequest;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementException;
import org.apache.solr.cluster.placement.PlacementPlan;
import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.PlacementRequest;
import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.impl.BalanceRequestImpl;
import org.apache.solr.cluster.placement.impl.ModificationRequestImpl;
import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
import org.apache.solr.common.util.Pair;
@@ -55,7 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Unit test for {@link AffinityPlacementFactory} */
-public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
+public class AffinityPlacementFactoryTest extends AbstractPlacementFactoryTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private PlacementPlugin plugin;
@@ -251,6 +252,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -359,6 +361,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
List.of("NRT " + AZ1_NRT_HIGHCORES, "TLOG " + AZ3_TLOGPULL), // shard 1
List.of("TLOG " + AZ2_TLOGPULL)); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -426,6 +429,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
List<List<String>> shardsReplicas = List.of(List.of());
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -489,6 +493,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
List.of("NRT 10", "TLOG 11"), // shard 1
List.of()); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@@ -523,95 +528,6 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
}
- /**
- * Verifies that a computed set of placements does match the expected placement on nodes.
- *
- * @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the
- * shard index, NRT the replica type and 3 the node on which the replica is placed. Shards are
- * 1-based. Nodes 0-based.
- * <p>Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of
- * shards as defined for the collection in the call to {@link
- * org.apache.solr.cluster.placement.Builders.CollectionBuilder#customCollectionSetup(List,
- * List)}
- * @param shardBuilders the shard builders are passed here to get the shard names by index
- * (1-based) rather than by parsing the shard names (which would break if we change the shard
- * naming scheme).
- */
- private static void verifyPlacements(
- Set<String> expectedPlacements,
- PlacementPlan placementPlan,
- List<Builders.ShardBuilder> shardBuilders,
- List<Node> liveNodes) {
- Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
-
- // Prepare structures for looking up shard name index and node index
- Map<String, Integer> shardNumbering = new HashMap<>();
- int index = 1; // first shard is 1 not 0
- for (Builders.ShardBuilder sb : shardBuilders) {
- shardNumbering.put(sb.getShardName(), index++);
- }
- Map<Node, Integer> nodeNumbering = new HashMap<>();
- index = 0;
- for (Node n : liveNodes) {
- nodeNumbering.put(n, index++);
- }
-
- if (expectedPlacements.size() != computedPlacements.size()) {
- fail(
- "Wrong number of placements, expected "
- + expectedPlacements.size()
- + " computed "
- + computedPlacements.size()
- + ". "
- + getExpectedVsComputedPlacement(
- expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
- }
-
- Set<String> expected = new HashSet<>(expectedPlacements);
- for (ReplicaPlacement p : computedPlacements) {
- String lookUpPlacementResult =
- shardNumbering.get(p.getShardName())
- + " "
- + p.getReplicaType().name()
- + " "
- + nodeNumbering.get(p.getNode());
- if (!expected.remove(lookUpPlacementResult)) {
- fail(
- "Computed placement ["
- + lookUpPlacementResult
- + "] not expected. "
- + getExpectedVsComputedPlacement(
- expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
- }
- }
- }
-
- private static String getExpectedVsComputedPlacement(
- Set<String> expectedPlacements,
- Set<ReplicaPlacement> computedPlacements,
- Map<String, Integer> shardNumbering,
- Map<Node, Integer> nodeNumbering) {
-
- StringBuilder sb = new StringBuilder("Expected placement: ");
- for (String placement : expectedPlacements) {
- sb.append("[").append(placement).append("] ");
- }
-
- sb.append("Computed placement: ");
- for (ReplicaPlacement placement : computedPlacements) {
- String lookUpPlacementResult =
- shardNumbering.get(placement.getShardName())
- + " "
- + placement.getReplicaType().name()
- + " "
- + nodeNumbering.get(placement.getNode());
-
- sb.append("[").append(lookUpPlacementResult).append("] ");
- }
-
- return sb.toString();
- }
-
@Test
public void testAvailabilityZones() throws Exception {
String collectionName = "azCollection";
@@ -811,6 +727,72 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
}
}
+ @Test
+ public void testFreeDiskConstraintsWithNewReplicas() throws Exception {
+ String collectionName = "freeDiskWithReplicasCollection";
+ int NUM_NODES = 3;
+ Builders.ClusterBuilder clusterBuilder =
+ Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
+ Node smallNode = null;
+ for (int i = 0; i < NUM_NODES; i++) {
+ Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
+ // Act as if the two replicas were placed on nodes 1 and 2
+ nodeBuilder.setCoreCount(0);
+ nodeBuilder.setFreeDiskGB(100.0);
+ }
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(
+ 3,
+ 1,
+ 0,
+ 0,
+ clusterBuilder.getLiveNodeBuilders(), // .subList(1, 3),
+ List.of(33, 33, 60));
+ clusterBuilder.addCollection(collectionBuilder);
+
+ PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+ Cluster cluster = placementContext.getCluster();
+
+ SolrCollection solrCollection = cluster.getCollection(collectionName);
+
+ // Test when an additional replicaType makes the projected indexSize go over the limit
+ // Add two replicas (different types) to the first shard, the second replica should fail
+ PlacementRequestImpl badReplicaPlacementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ StreamSupport.stream(solrCollection.shards().spliterator(), false)
+ .map(Shard::getShardName)
+ .findFirst()
+ .map(Set::of)
+ .orElseGet(Collections::emptySet),
+ cluster.getLiveNodes(),
+ 0,
+ 1,
+ 1);
+
+ assertThrows(
+ PlacementException.class,
+ () -> plugin.computePlacement(badReplicaPlacementRequest, placementContext));
+
+ // Test when an additional shard makes the projected indexSize go over the limit
+ // Add one replica to each shard, the third shard should fail
+ PlacementRequest badShardPlacementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ StreamSupport.stream(solrCollection.shards().spliterator(), false)
+ .map(Shard::getShardName)
+ .collect(Collectors.toSet()),
+ cluster.getLiveNodes(),
+ 0,
+ 1,
+ 1);
+
+ assertThrows(
+ PlacementException.class,
+ () -> plugin.computePlacement(badShardPlacementRequest, placementContext));
+ }
+
@Test
public void testWithCollectionPlacement() throws Exception {
AffinityPlacementConfig config =
@@ -1197,11 +1179,12 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
replicasPerNode.forEach(
(node, count) -> {
- assertEquals(perNode, count.get());
+ assertEquals(
+ "Wrong number of replicas for node: " + node.getName(), perNode, count.get());
});
shardsPerNode.forEach(
(node, names) -> {
- assertEquals(perNode, names.size());
+ assertEquals("Wrong number of shards for node: " + node.getName(), perNode, names.size());
});
replicasPerShard.forEach(
@@ -1368,60 +1351,6 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
}
}
- @Test
- @SuppressWarnings("SelfComparison")
- public void testCompareSpreadDomainWithNodes() {
- Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
- final List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
- nodeBuilders.get(0).setNodeName("nodeA");
- nodeBuilders.get(1).setNodeName("nodeB");
- nodeBuilders.get(2).setNodeName("nodeC");
-
- Cluster cluster = clusterBuilder.build();
- Node nodeA =
- cluster.getLiveNodes().stream()
- .filter((n) -> n.getName().equals("nodeA"))
- .findFirst()
- .get();
- Node nodeB =
- cluster.getLiveNodes().stream()
- .filter((n) -> n.getName().equals("nodeB"))
- .findFirst()
- .get();
- Node nodeC =
- cluster.getLiveNodes().stream()
- .filter((n) -> n.getName().equals("nodeC"))
- .findFirst()
- .get();
-
- Comparator<Node> nodeComparator = Comparator.comparing(Node::getName);
- List<Node> listInGroup1 = new ArrayList<>(List.of(nodeC, nodeA));
- AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes group1 =
- new AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
- "foo", listInGroup1, 0, nodeComparator);
- AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes group2 =
- new AffinityPlacementFactory.AffinityPlacementPlugin.SpreadDomainWithNodes(
- "bar", List.of(nodeB), 1, nodeComparator);
- assertEquals("Comparing to itself should return 0", 0, group1.compareTo(group1));
- assertEquals(
- "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
- 1,
- group1.compareTo(group2));
- assertEquals(
- "group 1 should be greater, since 'nodeC' is greater than 'nodeB",
- -1,
- group2.compareTo(group1));
- listInGroup1.remove(0);
- assertEquals(
- "group 1 should be greater, since 'nodeB' is greater than 'nodeA",
- -1,
- group1.compareTo(group2));
- listInGroup1.remove(0);
- listInGroup1.add(nodeB);
- assertEquals(
- "group 1 should be greater because of the tie breaker", -1, group1.compareTo(group2));
- }
-
@Test
public void testSpreadDomainsWithDownNode() throws Exception {
defaultConfig.spreadAcrossDomains = true;
@@ -1462,4 +1391,280 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
assertEquals(liveNodes.get(1), rp.getNode());
}
}
+
+ /** Tests replica balancing across all nodes in a cluster */
+ @Test
+ public void testBalancingBareMetrics() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 2"), // shard 1
+ List.of("NRT 1", "NRT 4", "TLOG 3")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ // Add another collection
+ collectionBuilder = Builders.newCollectionBuilder("b");
+ shardsReplicas =
+ List.of(
+ List.of("NRT 1", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 0")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ BalanceRequestImpl balanceRequest =
+ new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("b 1 TLOG 0 -> 2", "b 1 NRT 3 -> 4");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+
+ /** Tests replica balancing across all nodes in a cluster */
+ @Test
+ public void testBalancingExistingMetrics() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ BalanceRequestImpl balanceRequest =
+ new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1", "a 2 NRT 3 -> 0");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+
+ /** Tests that balancing works across a subset of nodes */
+ @Test
+ public void testBalancingWithNodeSubset() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ // Only balance over node 1 and 2
+ List<Node> overNodes = clusterBuilder.buildLiveNodes();
+ overNodes.remove(0);
+
+ BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(overNodes));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+
+ /**
+ * Tests balancing with multiple criteria: Replica type restricted nodes, Availability zones +
+ * existing collection
+ */
+ @Test
+ public void testBalancingMultiCriteria() throws Exception {
+ String collectionName = "a";
+
+ // Note node numbering is in purpose not following AZ structure
+ final int AZ1_NRT_LOWCORES = 0;
+ final int AZ1_NRT_HIGHCORES = 3;
+ final int AZ1_TLOGPULL_LOWFREEDISK = 5;
+
+ final int AZ2_NRT_MEDCORES = 2;
+ final int AZ2_NRT_HIGHCORES = 1;
+ final int AZ2_TLOGPULL = 7;
+
+ final int AZ3_NRT_LOWCORES = 4;
+ final int AZ3_NRT_HIGHCORES = 6;
+ final int AZ3_TLOGPULL = 8;
+
+ final String AZ1 = "AZ1";
+ final String AZ2 = "AZ2";
+ final String AZ3 = "AZ3";
+
+ final int LOW_CORES = 10;
+ final int MED_CORES = 50;
+ final int HIGH_CORES = 100;
+
+ final String TLOG_PULL_REPLICA_TYPE = "TLOG, PULL";
+ final String NRT_REPLICA_TYPE = "Nrt";
+
+ // Cluster nodes and their attributes.
+ // 3 AZ's with three nodes each, 2 of which can only take NRT, one that can take TLOG or PULL
+ // One of the NRT has fewer cores than the other
+ // The TLOG/PULL replica on AZ1 doesn't have much free disk space
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ for (int i = 0; i < 9; i++) {
+ final String az;
+ final int numcores;
+ final double freedisk;
+ final String acceptedReplicaType;
+
+ if (i == AZ1_NRT_LOWCORES || i == AZ1_NRT_HIGHCORES || i == AZ1_TLOGPULL_LOWFREEDISK) {
+ az = AZ1;
+ } else if (i == AZ2_NRT_HIGHCORES || i == AZ2_NRT_MEDCORES || i == AZ2_TLOGPULL) {
+ az = AZ2;
+ } else {
+ az = AZ3;
+ }
+
+ if (i == AZ1_NRT_LOWCORES || i == AZ3_NRT_LOWCORES) {
+ numcores = LOW_CORES;
+ } else if (i == AZ2_NRT_MEDCORES) {
+ numcores = MED_CORES;
+ } else {
+ numcores = HIGH_CORES;
+ }
+
+ if (i == AZ1_TLOGPULL_LOWFREEDISK) {
+ freedisk = PRIORITIZED_FREE_DISK_GB - 10;
+ } else {
+ freedisk = PRIORITIZED_FREE_DISK_GB + 10;
+ }
+
+ if (i == AZ1_TLOGPULL_LOWFREEDISK || i == AZ2_TLOGPULL || i == AZ3_TLOGPULL) {
+ acceptedReplicaType = TLOG_PULL_REPLICA_TYPE;
+ } else {
+ acceptedReplicaType = NRT_REPLICA_TYPE;
+ }
+
+ nodeBuilders
+ .get(i)
+ .setSysprop(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP, az)
+ .setSysprop(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP, acceptedReplicaType)
+ .setCoreCount(numcores)
+ .setFreeDiskGB(freedisk);
+ }
+
+ // The collection already exists with shards and replicas.
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of(
+ "NRT " + AZ1_NRT_HIGHCORES,
+ "TLOG " + AZ3_TLOGPULL,
+ "NRT " + AZ3_NRT_HIGHCORES,
+ "NRT " + AZ3_NRT_LOWCORES,
+ "TLOG " + AZ2_TLOGPULL), // shard 1
+ List.of(
+ "TLOG " + AZ2_TLOGPULL,
+ "NRT " + AZ2_NRT_HIGHCORES,
+ "NRT " + AZ3_NRT_LOWCORES,
+ "TLOG " + AZ3_TLOGPULL)); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(liveNodes));
+ BalancePlan bp =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+ Set<String> expectedMovements =
+ Set.of(
+ "a 2 NRT " + AZ2_NRT_HIGHCORES + " -> " + AZ1_NRT_LOWCORES,
+ "a 1 NRT " + AZ3_NRT_HIGHCORES + " -> " + AZ1_NRT_LOWCORES,
+ "a 1 NRT " + AZ1_NRT_HIGHCORES + " -> " + AZ2_NRT_MEDCORES);
+ verifyBalancing(expectedMovements, bp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+ @Test
+ public void testWithCollectionBalancing() throws Exception {
+ AffinityPlacementConfig config =
+ new AffinityPlacementConfig(
+ MINIMAL_FREE_DISK_GB,
+ PRIORITIZED_FREE_DISK_GB,
+ Map.of(primaryCollectionName, secondaryCollectionName),
+ Map.of());
+ configurePlugin(config);
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(6);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder =
+ Builders.newCollectionBuilder(primaryCollectionName);
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas = List.of(List.of("NRT 0", "NRT 1", "NRT 3")); // shard 1
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ // Add another collection
+ collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
+ shardsReplicas = List.of(List.of("NRT 0", "NRT 2", "NRT 4")); // shard 1
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+ Cluster cluster = placementContext.getCluster();
+
+ BalanceRequestImpl balanceRequest = new BalanceRequestImpl(cluster.getLiveNodes());
+
+ BalancePlan bp = plugin.computeBalancing(balanceRequest, placementContext);
+ assertEquals(
+ "No movements expected, single movable replica requires co-placement",
+ 0,
+ bp.getReplicaMovements().size());
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java
new file mode 100644
index 00000000000..6afd24fafd1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactoryTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.Builders;
+import org.apache.solr.cluster.placement.PlacementContext;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.cluster.placement.impl.BalanceRequestImpl;
+import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Unit test for {@link AffinityPlacementFactory} */
+public class MinimizeCoresPlacementFactoryTest extends AbstractPlacementFactoryTest {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private PlacementPlugin plugin;
+
+ @Before
+ public void setupPlugin() {
+ configurePlugin();
+ }
+
+ private void configurePlugin() {
+ MinimizeCoresPlacementFactory factory = new MinimizeCoresPlacementFactory();
+ plugin = factory.createPluginInstance();
+ }
+
+ @Test
+ public void testBasicPlacementNewCollection() throws Exception {
+ testBasicPlacementInternal(false);
+ }
+
+ @Test
+ public void testBasicPlacementExistingCollection() throws Exception {
+ testBasicPlacementInternal(true);
+ }
+
+ /**
+ * When this test places a replica for a new collection, it should pick the node with fewer cores.
+ *
+ * <p>
+ *
+ * <p>When it places a replica for an existing collection, it should pick the node with fewer
+ * cores that doesn't already have a replica for the shard.
+ */
+ private void testBasicPlacementInternal(boolean hasExistingCollection) throws Exception {
+ String collectionName = "basicCollection";
+
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(2);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+
+ if (hasExistingCollection) {
+ // Existing collection has replicas for its shards and is visible in the cluster state
+ collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+ } else {
+ // New collection to create has the shards defined but no replicas and is not present in
+ // cluster state
+ collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+ }
+
+ PlacementContext placementContext = clusterBuilder.buildPlacementContext();
+
+ SolrCollection solrCollection = collectionBuilder.build();
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 1,
+ 0,
+ 0);
+
+ PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
+
+ assertEquals(1, pp.getReplicaPlacements().size());
+ ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+ assertEquals(hasExistingCollection ? liveNodes.get(1) : liveNodes.get(0), rp.getNode());
+ }
+
+ /**
+ * Tests that existing collection replicas are taken into account when preventing more than one
+ * replica per shard to be placed on any node.
+ */
+ @Test
+ public void testPlacementWithExistingReplicas() throws Exception {
+ String collectionName = "existingCollection";
+
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place an additional NRT and an additional TLOG replica for each shard
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), 1, 1, 0);
+
+ // The replicas must be placed on the most appropriate nodes, i.e. those that do not already
+ // have a replica for the shard and then on the node with the lowest number of cores. NRT are
+ // placed first and given the cluster state here the placement is deterministic (easier to test,
+ // only one good placement).
+ PlacementPlan pp =
+ plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "shard replica-type node"
+ Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+ /**
+ * Tests that if a collection has replicas on nodes not currently live, placement for new replicas
+ * works ok.
+ */
+ @Test
+ public void testCollectionOnDeadNodes() throws Exception {
+ String collectionName = "walkingDead";
+
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coreCount = 0;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coreCount++);
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ // The collection below has shard 1 having replicas only on dead nodes and shard 2 no replicas
+ // at all... (which is likely a challenging condition to recover from, but the placement
+ // computations should still execute happily).
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 10", "TLOG 11"), // shard 1
+ List.of()); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place an additional PULL replica for shard 1
+ PlacementRequestImpl placementRequest =
+ new PlacementRequestImpl(
+ solrCollection,
+ Set.of(solrCollection.iterator().next().getShardName()),
+ new HashSet<>(liveNodes),
+ 0,
+ 0,
+ 1);
+
+ PlacementPlan pp =
+ plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "shard replica-type node"
+ // Node 0 has fewer cores than node 1 (0 vs 1) so the placement should go there.
+ Set<String> expectedPlacements = Set.of("1 PULL 0");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+
+ // If we placed instead a replica for shard 2 (starting with the same initial cluster state, not
+ // including the first placement above), it should go too to node 0 since it has fewer cores...
+ Iterator<Shard> it = solrCollection.iterator();
+ it.next(); // skip first shard to do placement for the second one...
+ placementRequest =
+ new PlacementRequestImpl(
+ solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes), 0, 0, 1);
+ pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
+ expectedPlacements = Set.of("2 PULL 0");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+ /** Tests replica balancing across all nodes in a cluster */
+ @Test
+ public void testBalancingBareMetrics() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 2"), // shard 1
+ List.of("NRT 1", "NRT 4", "TLOG 3")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ // Add another collection
+ collectionBuilder = Builders.newCollectionBuilder("b");
+ shardsReplicas =
+ List.of(
+ List.of("NRT 1", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 0")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ BalanceRequestImpl balanceRequest =
+ new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("b 1 TLOG 0 -> 2", "b 1 NRT 3 -> 4");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+
+ /** Tests replica balancing across all nodes in a cluster */
+ @Test
+ public void testBalancingExistingMetrics() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ BalanceRequestImpl balanceRequest =
+ new BalanceRequestImpl(new HashSet<>(clusterBuilder.buildLiveNodes()));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1", "a 2 NRT 3 -> 0");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+
+ /** Tests that balancing works across a subset of nodes */
+ @Test
+ public void testBalancingWithNodeSubset() throws Exception {
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ List<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder("a");
+ // Note that the collection as defined below is in a state that would NOT be returned by the
+ // placement plugin: shard 1 has two replicas on node 0. The plugin should still be able to
+ // place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas =
+ List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+
+ // Only balance over node 1 and 2
+ List<Node> overNodes = clusterBuilder.buildLiveNodes();
+ overNodes.remove(0);
+
+ BalanceRequestImpl balanceRequest = new BalanceRequestImpl(new HashSet<>(overNodes));
+ BalancePlan balancePlan =
+ plugin.computeBalancing(balanceRequest, clusterBuilder.buildPlacementContext());
+
+ // Each expected placement is represented as a string "col shard replica-type fromNode ->
+ // toNode"
+ Set<String> expectedPlacements = Set.of("a 1 NRT 3 -> 1");
+ verifyBalancing(
+ expectedPlacements,
+ balancePlan,
+ collectionBuilder.getShardBuilders(),
+ clusterBuilder.buildLiveNodes());
+ }
+}
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
index 0480df631fc..d1f79a56968 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cluster-node-management.adoc
@@ -360,10 +360,75 @@ curl -X POST -H 'Content-type:application/json' --data-binary '
At this point, if you run a query on a node having e.g., `rack=rack1`, Solr will try to hit only replicas from `rack1`.
+[[balancereplicas]]
+== Balance Replicas
+
+Shuffle the replicas across the given set of Solr nodes until an equilibrium is reached.
+
+[example.tab-pane#v2balancereplicas]
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+curl -X POST http://localhost:8983/api/cluster/replicas/balance -H 'Content-Type: application/json' -d '
+ {
+ "nodes": ["localhost:8983_solr", "localhost:8984_solr"],
+ "async": "balance-replicas-1"
+ }
+'
+----
+====
+
+=== Parameters
+
+
+`nodes`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: none
+|===
++
+The nodes over which replicas will be balanced.
+Replicas that live outside this set of nodes will not be included in the balancing.
++
+If this parameter is not provided, all live data nodes will be used.
+
+`waitForFinalState`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the request will complete only when all affected replicas become active.
+If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas.
+
+`async`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: none
+|===
++
+Request ID to track this action which will be xref:configuration-guide:collections-api.adoc#asynchronous-calls[processed asynchronously].
+
+[IMPORTANT]
+====
+This operation does not hold necessary locks on the replicas that belong to on the source node.
+So don't perform other collection operations in this period.
+====
+
+=== BalanceReplicas Response
+
+The response will include the status of the request.
+If the status is anything other than "0", an error message will explain why the request failed.
+
[[balanceshardunique]]
== BALANCESHARDUNIQUE: Balance a Property Across Nodes
-Insures that a particular property is distributed evenly amongst the physical nodes that make up a collection.
+Ensures that a particular property is distributed evenly amongst the physical nodes that make up a collection.
If the property already exists on a replica, every effort is made to leave it there.
If the property is *not* on any replica on a shard, one is chosen and the property is added.
@@ -539,6 +604,16 @@ If this parameter is not provided, Solr will identify nodes automatically based
If this flag is set to `true`, all replicas are created in separate threads.
Keep in mind that this can lead to very high network and disk I/O if the replicas have very large indices.
+`waitForFinalState`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `false`
+|===
++
+If `true`, the request will complete only when all affected replicas become active.
+If `false`, the API will return when the bare minimum replicas are active, such as the affected leader replicas.
+
`async`::
+
[%autowidth,frame=none]
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index b54e29f4d6e..aae2d250c1f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -425,6 +425,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
public interface ReplicaStateProps {
String COLLECTION = "collection";
String SHARD_ID = "shard";
+ String REPLICA_ID = "replica";
String LEADER = "leader";
String STATE = "state";
String CORE_NAME = "core";
@@ -438,4 +439,12 @@ public class Replica extends ZkNodeProps implements MapWriter {
Set.of(
LEADER, STATE, CORE_NAME, CORE_NODE_NAME, TYPE, NODE_NAME, BASE_URL, FORCE_SET_STATE);
}
+
+ public ZkNodeProps toFullProps() {
+ return new ZkNodeProps()
+ .plus(propMap)
+ .plus(ReplicaStateProps.COLLECTION, getCollection())
+ .plus(ReplicaStateProps.SHARD_ID, getShard())
+ .plus(ReplicaStateProps.REPLICA_ID, getName());
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index a1910d4edbb..516fdbfb919 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -38,6 +38,9 @@ public interface CollectionParams {
String SOURCE_NODE = "sourceNode";
String TARGET_NODE = "targetNode";
+ String NODES = "nodes";
+ String MAX_BALANCE_SKEW = "maxBalanceSkew";
+
enum LockLevel {
NONE(10, null),
REPLICA(3, null),
@@ -126,6 +129,8 @@ public interface CollectionParams {
MOCK_SHARD_TASK(false, LockLevel.SHARD),
// TODO when we have a node level lock use it here
REPLACENODE(true, LockLevel.NONE),
+ // TODO when we have a node level lock use it here
+ BALANCE_REPLICAS(true, LockLevel.NONE),
DELETENODE(true, LockLevel.NONE),
MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
NONE(false, LockLevel.NONE),
diff --git a/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java b/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
index 776f5ff2dd7..cffbea31b4c 100644
--- a/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
+++ b/solr/test-framework/src/java/org/apache/solr/cluster/placement/Builders.java
@@ -31,6 +31,7 @@ import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.impl.AttributeFetcherImpl;
import org.apache.solr.cluster.placement.impl.AttributeValuesImpl;
+import org.apache.solr.cluster.placement.impl.BalancePlanFactoryImpl;
import org.apache.solr.cluster.placement.impl.CollectionMetricsBuilder;
import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl;
@@ -108,6 +109,8 @@ public class Builders {
private static final PlacementPlanFactory PLACEMENT_PLAN_FACTORY =
new PlacementPlanFactoryImpl();
+ private static final BalancePlanFactory BALANCE_PLAN_FACTORY = new BalancePlanFactoryImpl();
+
public PlacementContext buildPlacementContext() {
Cluster cluster = build();
AttributeFetcher attributeFetcher = buildAttributeFetcher();
@@ -126,6 +129,11 @@ public class Builders {
public PlacementPlanFactory getPlacementPlanFactory() {
return PLACEMENT_PLAN_FACTORY;
}
+
+ @Override
+ public BalancePlanFactory getBalancePlanFactory() {
+ return BALANCE_PLAN_FACTORY;
+ }
};
}
@@ -178,10 +186,12 @@ public class Builders {
if (!collectionBuilders.isEmpty()) {
Map<Node, Object> nodeToCoreCount =
metrics.computeIfAbsent(NodeMetricImpl.NUM_CORES, n -> new HashMap<>());
+ Map<Node, Object> nodeToFreeDisk =
+ metrics.computeIfAbsent(NodeMetricImpl.FREE_DISK_GB, n -> new HashMap<>());
collectionBuilders.forEach(
builder -> {
- collectionMetrics.put(
- builder.collectionName, builder.collectionMetricsBuilder.build());
+ CollectionMetrics thisCollMetrics = builder.collectionMetricsBuilder.build();
+ collectionMetrics.put(builder.collectionName, thisCollMetrics);
SolrCollection collection = builder.build();
collection
.iterator()
@@ -195,6 +205,19 @@ public class Builders {
replica.getNode(),
(node, count) ->
(count == null) ? 1 : ((Number) count).intValue() + 1);
+ double leaderDiskSpace =
+ thisCollMetrics
+ .getShardMetrics(shard.getShardName())
+ .flatMap(
+ m -> m.getReplicaMetrics(replica.getReplicaName()))
+ .flatMap(
+ m ->
+ m.getReplicaMetric(
+ ReplicaMetricImpl.INDEX_SIZE_GB))
+ .orElse(0D);
+ nodeToFreeDisk.computeIfPresent(
+ replica.getNode(),
+ (node, freeDisk) -> ((Double) freeDisk) - leaderDiskSpace);
}));
});
}