You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by il...@apache.org on 2020/12/01 19:07:24 UTC
[lucene-solr] branch master updated: SOLR-15004: tests for the
replica placement API + placement plugin fixes and light refactoring
(#2110)
This is an automated email from the ASF dual-hosted git repository.
ilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 3df7250 SOLR-15004: tests for the replica placement API + placement plugin fixes and light refactoring (#2110)
3df7250 is described below
commit 3df72502ccb72c2775db72b3347e99dc621d2d6b
Author: Ilan Ginzburg <il...@gmail.com>
AuthorDate: Tue Dec 1 20:07:08 2020 +0100
SOLR-15004: tests for the replica placement API + placement plugin fixes and light refactoring (#2110)
Co-authored-by: Andrzej Bialecki <ab...@apache.org>
---
.../src/java/org/apache/solr/cluster/Replica.java | 12 +-
.../org/apache/solr/cluster/SolrCollection.java | 33 +-
.../solr/cluster/placement/AttributeFetcher.java | 49 +-
.../solr/cluster/placement/AttributeValues.java | 60 +-
.../solr/cluster/placement/PlacementPlan.java | 2 +-
.../solr/cluster/placement/PlacementPlugin.java | 14 +-
.../cluster/placement/PlacementPluginConfig.java | 10 +
.../solr/cluster/placement/PlacementRequest.java | 54 +-
.../placement/impl/AttributeFetcherImpl.java | 376 +++++------
.../placement/impl/AttributeValuesImpl.java | 146 ++---
.../placement/impl/PlacementPlanFactoryImpl.java | 18 +-
.../cluster/placement/impl/PlacementPlanImpl.java | 10 +
.../impl/PlacementPluginAssignStrategy.java | 4 +-
.../placement/impl/PlacementPluginConfigImpl.java | 34 +-
.../placement/impl/PlacementRequestImpl.java | 18 +-
.../placement/impl/ReplicaPlacementImpl.java | 5 +
.../impl/SimpleClusterAbstractionsImpl.java | 120 ++--
.../plugins/AffinityPlacementFactory.java | 577 ++++++++++++++++
.../plugins/MinimizeCoresPlacementFactory.java | 126 ++++
.../SamplePluginAffinityReplicaPlacement.java | 509 --------------
.../plugins/SamplePluginMinimizeCores.java | 138 ----
.../plugins/SamplePluginRandomPlacement.java | 88 ---
.../cluster/placement/plugins/package-info.java | 2 +-
.../java/org/apache/solr/handler/ClusterAPI.java | 8 +-
.../solr/handler/admin/ContainerPluginsApi.java | 2 +-
.../cluster/placement/AttributeFetcherForTest.java | 93 +++
.../apache/solr/cluster/placement/Builders.java | 452 +++++++++++++
.../placement/ClusterAbstractionsForTest.java | 316 +++++++++
.../impl/PlacementPluginIntegrationTest.java | 112 ++++
.../impl/SimpleClusterAbstractionsTest.java | 89 +++
.../plugins/AffinityPlacementFactoryTest.java | 730 +++++++++++++++++++++
.../solr/common/cloud/ClusterProperties.java | 8 +-
.../apache/solr/common/cloud/ZkStateReader.java | 9 +-
33 files changed, 3061 insertions(+), 1163 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cluster/Replica.java b/solr/core/src/java/org/apache/solr/cluster/Replica.java
index 2c9230f..ff83f3f 100644
--- a/solr/core/src/java/org/apache/solr/cluster/Replica.java
+++ b/solr/core/src/java/org/apache/solr/cluster/Replica.java
@@ -42,7 +42,17 @@ public interface Replica {
* The order of this enum is important from the most to least "important" replica type.
*/
enum ReplicaType {
- NRT, TLOG, PULL
+ NRT('n'), TLOG('t'), PULL('p');
+
+ private char suffixChar;
+
+ ReplicaType(char suffixChar) {
+ this.suffixChar = suffixChar;
+ }
+
+ public char getSuffixChar() {
+ return suffixChar;
+ }
}
enum ReplicaState {
diff --git a/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java b/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java
index 23e79a4..d22560a 100644
--- a/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java
+++ b/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java
@@ -23,6 +23,7 @@ import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementRequest;
import java.util.Iterator;
+import java.util.Set;
/**
* Represents a Collection in SolrCloud (unrelated to {@link java.util.Collection} that uses the nicer name).
@@ -55,20 +56,26 @@ public interface SolrCollection {
Iterable<Shard> shards();
/**
- * <p>Returns the value of a custom property name set on the {@link SolrCollection} or {@code null} when no such
- * property was set. Properties are set through the Collection API. See for example {@code COLLECTIONPROP} in the Solr reference guide.
- *
- * <p><b>{@link PlacementPlugin} related note:</b></p>
- * <p>Using custom properties in conjunction with ad hoc {@link PlacementPlugin} code allows customizing placement
- * decisions per collection.
- *
- * <p>For example if a collection is to be placed only on nodes using SSD storage and not rotating disks, it can be
- * identified as such using some custom property (collection property could for example be called "driveType" and have
- * value "ssd" in that case), and the placement plugin (implementing {@link PlacementPlugin}) would then
- * {@link AttributeFetcher#requestNodeSystemProperty(String)} for that property from all nodes and only place replicas
- * of this collection on {@link Node}'s for which
- * {@link AttributeValues#getDiskType(Node)} is non empty and equal to {@link org.apache.solr.cluster.placement.AttributeFetcher.DiskHardwareType#SSD}.
+ * @return a set of the names of the shards defined for this collection. This set is backed by an internal map so should
+ * not be modified.
*/
+ Set<String> getShardNames();
+
+ /**
+ * <p>Returns the value of a custom property name set on the {@link SolrCollection} or {@code null} when no such
+ * property was set. Properties are set through the Collection API. See for example {@code COLLECTIONPROP} in the Solr reference guide.
+ *
+ * <p><b>{@link PlacementPlugin} related note:</b></p>
+ * <p>Using custom properties in conjunction with ad hoc {@link PlacementPlugin} code allows customizing placement
+ * decisions per collection.
+ *
+ * <p>For example if a collection is to be placed only on nodes using SSD storage and not rotating disks, it can be
+ * identified as such using some custom property (collection property could for example be called "driveType" and have
+ * value "ssd" in that case), and the placement plugin (implementing {@link PlacementPlugin}) would then
+ * {@link AttributeFetcher#requestNodeSystemProperty(String)} for that property from all nodes and only place replicas
+ * of this collection on {@link Node}'s for which
+ * {@link AttributeValues#getDiskType(Node)} is non empty and equal to {@link org.apache.solr.cluster.placement.AttributeFetcher.DiskHardwareType#SSD}.
+ */
String getCustomProperty(String customPropertyName);
/*
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java
index cb368d7..9578326 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java
@@ -25,31 +25,49 @@ import java.util.Set;
* <p>Instances of this interface are used to fetch various attributes from nodes (and other sources) in the cluster.</p>
*/
public interface AttributeFetcher {
- /** Request the number of cores on each node. To get the value use {@link AttributeValues#getCoresCount(Node)} */
+ /**
+ * Request the number of cores on each node. To get the value use {@link AttributeValues#getCoresCount(Node)}
+ */
AttributeFetcher requestNodeCoreCount();
- /** Request the disk hardware type on each node. To get the value use {@link AttributeValues#getDiskType(Node)} */
+ /**
+ * Request the disk hardware type on each node. To get the value use {@link AttributeValues#getDiskType(Node)}
+ */
AttributeFetcher requestNodeDiskType();
- /** Request the free disk size on each node. To get the value use {@link AttributeValues#getFreeDisk(Node)} */
+ /**
+ * Request the free disk size on each node. To get the value use {@link AttributeValues#getFreeDisk(Node)}
+ */
AttributeFetcher requestNodeFreeDisk();
- /** Request the total disk size on each node. To get the value use {@link AttributeValues#getTotalDisk(Node)} */
+ /**
+ * Request the total disk size on each node. To get the value use {@link AttributeValues#getTotalDisk(Node)}
+ */
AttributeFetcher requestNodeTotalDisk();
- /** Request the heap usage on each node. To get the value use {@link AttributeValues#getHeapUsage(Node)} */
+ /**
+ * Request the heap usage on each node. To get the value use {@link AttributeValues#getHeapUsage(Node)}
+ */
AttributeFetcher requestNodeHeapUsage();
- /** Request the system load average on each node. To get the value use {@link AttributeValues#getSystemLoadAverage(Node)} */
+ /**
+ * Request the system load average on each node. To get the value use {@link AttributeValues#getSystemLoadAverage(Node)}
+ */
AttributeFetcher requestNodeSystemLoadAverage();
- /** Request a given system property on each node. To get the value use {@link AttributeValues#getSystemProperty(Node, String)} */
+ /**
+ * Request a given system property on each node. To get the value use {@link AttributeValues#getSystemProperty(Node, String)}
+ */
AttributeFetcher requestNodeSystemProperty(String name);
- /** Request an environment variable on each node. To get the value use {@link AttributeValues#getEnvironmentVariable(Node, String)} */
+ /**
+ * Request an environment variable on each node. To get the value use {@link AttributeValues#getEnvironmentVariable(Node, String)}
+ */
AttributeFetcher requestNodeEnvironmentVariable(String name);
- /** Request a node metric from each node. To get the value use {@link AttributeValues#getMetric(Node, String, NodeMetricRegistry)} */
+ /**
+ * Request a node metric from each node. To get the value use {@link AttributeValues#getMetric(Node, String, NodeMetricRegistry)}
+ */
AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry);
@@ -59,12 +77,15 @@ public interface AttributeFetcher {
*/
AttributeFetcher fetchFrom(Set<Node> nodes);
- /** Requests a (non node) metric of a given scope and name. To get the value use {@link AttributeValues#getMetric(String, String)} */
+ /**
+ * Requests a (non node) metric of a given scope and name. To get the value use {@link AttributeValues#getMetric(String, String)}
+ */
AttributeFetcher requestMetric(String scope, String metricName);
/**
* Fetches all requested node attributes from all nodes passed to {@link #fetchFrom(Set)} as well as non node attributes
* (those requested for example using {@link #requestMetric(String, String)}.
+ *
* @return An instance allowing retrieval of all attributed that could be fetched.
*/
AttributeValues fetchAttributes();
@@ -73,9 +94,13 @@ public interface AttributeFetcher {
* Registry options for {@link Node} metrics.
*/
enum NodeMetricRegistry {
- /** corresponds to solr.node */
+ /**
+ * corresponds to solr.node
+ */
SOLR_NODE,
- /** corresponds to solr.jvm */
+ /**
+ * corresponds to solr.jvm
+ */
SOLR_JVM
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java
index 4519c8a..24fcb6f 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java
@@ -22,34 +22,54 @@ import org.apache.solr.cluster.Node;
import java.util.Optional;
public interface AttributeValues {
- /** For the given node: number of cores */
- Optional<Integer> getCoresCount(Node node);
+ /**
+ * For the given node: number of cores
+ */
+ Optional<Integer> getCoresCount(Node node);
- /** For the given node: Hardware type of the disk partition where cores are stored */
- Optional<AttributeFetcher.DiskHardwareType> getDiskType(Node node);
+ /**
+ * For the given node: Hardware type of the disk partition where cores are stored
+ */
+ Optional<AttributeFetcher.DiskHardwareType> getDiskType(Node node);
- /** For the given node: Free disk size in Gigabytes of the partition on which cores are stored */
- Optional<Long> getFreeDisk(Node node);
+ /**
+ * For the given node: Free disk size in Gigabytes of the partition on which cores are stored
+ */
+ Optional<Long> getFreeDisk(Node node);
- /** For the given node: Total disk size in Gigabytes of the partition on which cores are stored */
- Optional<Long> getTotalDisk(Node node);
+ /**
+ * For the given node: Total disk size in Gigabytes of the partition on which cores are stored
+ */
+ Optional<Long> getTotalDisk(Node node);
- /** For the given node: Percentage between 0 and 100 of used heap over max heap */
- Optional<Double> getHeapUsage(Node node);
+ /**
+ * For the given node: Percentage between 0 and 100 of used heap over max heap
+ */
+ Optional<Double> getHeapUsage(Node node);
- /** For the given node: matches {@link java.lang.management.OperatingSystemMXBean#getSystemLoadAverage()} */
- Optional<Double> getSystemLoadAverage(Node node);
+ /**
+ * For the given node: matches {@link java.lang.management.OperatingSystemMXBean#getSystemLoadAverage()}
+ */
+ Optional<Double> getSystemLoadAverage(Node node);
- /** For the given node: system property value (system properties are passed to Java using {@code -Dname=value} */
- Optional<String> getSystemProperty(Node node, String name);
+ /**
+ * For the given node: system property value (system properties are passed to Java using {@code -Dname=value}
+ */
+ Optional<String> getSystemProperty(Node node, String name);
- /** For the given node: environment variable value */
- Optional<String> getEnvironmentVariable(Node node, String name);
+ /**
+ * For the given node: environment variable value
+ */
+ Optional<String> getEnvironmentVariable(Node node, String name);
- /** For the given node: metric of specific name and registry */
- Optional<Double> getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry);
+ /**
+ * For the given node: metric of specific name and registry
+ */
+ Optional<Double> getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry);
- /** Get a non node related metric of specific scope and name */
- Optional<Double> getMetric(String scope, String metricName);
+ /**
+ * Get a non node related metric of specific scope and name
+ */
+ Optional<Double> getMetric(String scope, String metricName);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java
index c4738a5..331578b 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java
@@ -24,7 +24,7 @@ import java.util.Set;
/**
* A fully specified plan or instructions for placement, deletion or move to be applied to the cluster.<p>
* Fully specified means the actual {@link Node}'s on which to place replicas have been decided.
- *
+ * <p>
* Instances are created by plugin code using {@link PlacementPlanFactory}. This interface obviously doesn't expose much but
* the underlying Solr side implementation has all that is needed (and will do at least one cast in order to execute the
* plan, likely then using some type of visitor pattern).
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
index 28b6476..bbb52cb 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java
@@ -36,13 +36,13 @@ public interface PlacementPlugin {
*
* <p>Configuration is passed upon creation of a new instance of this class by {@link PlacementPluginFactory#createPluginInstance}.
*
- * @param cluster initial state of the cluster. Note there are {@link java.util.Set}'s and {@link java.util.Map}'s
- * accessible from the {@link Cluster} and other reachable instances. These collection will not change
- * while the plugin is executing and will be thrown away once the plugin is done. The plugin code can
- * therefore modify them if needed.
- * @param placementRequest request for placing new replicas or moving existing replicas on the cluster.
- * @param attributeFetcher Factory used by the plugin to fetch additional attributes from the cluster nodes, such as
- * count of coresm ssytem properties etc..
+ * @param cluster initial state of the cluster. Note there are {@link java.util.Set}'s and {@link java.util.Map}'s
+ * accessible from the {@link Cluster} and other reachable instances. These collection will not change
+ * while the plugin is executing and will be thrown away once the plugin is done. The plugin code can
+ * therefore modify them if needed.
+ * @param placementRequest request for placing new replicas or moving existing replicas on the cluster.
+ * @param attributeFetcher Factory used by the plugin to fetch additional attributes from the cluster nodes, such as
+ * count of coresm ssytem properties etc..
* @param placementPlanFactory Factory used to create instances of {@link PlacementPlan} to return computed decision.
* @return plan satisfying the placement request.
*/
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java
index a39390f..d223dcc 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java
@@ -68,6 +68,16 @@ package org.apache.solr.cluster.placement;
* </pre>
*/
public interface PlacementPluginConfig {
+
+ /**
+ * The key in {@code clusterprops.json} under which the plugin factory and the plugin configuration are defined.
+ */
+ String PLACEMENT_PLUGIN_CONFIG_KEY = "placement-plugin";
+ /**
+ * Name of the property containing the factory class
+ */
+ String FACTORY_CLASS = "class";
+
/**
* @return the configured {@link String} value corresponding to {@code configName} if one exists (could be the empty
* string) and {@code null} otherwise.
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java
index 61b49dd..44222a2 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java
@@ -31,34 +31,34 @@ import java.util.Set;
* is specified (defaults to being equal to the set returned by {@link Cluster#getLiveNodes()}).
*/
public interface PlacementRequest {
- /**
- * The {@link SolrCollection} to add {@link Replica}(s) to.
- */
- SolrCollection getCollection();
+ /**
+ * The {@link SolrCollection} to add {@link Replica}(s) to.
+ */
+ SolrCollection getCollection();
- /**
- * <p>Shard name(s) for which new replicas placement should be computed. The shard(s) might exist or not (that's why this
- * method returns a {@link Set} of {@link String}'s and not directly a set of {@link Shard} instances).
- *
- * <p>Note the Collection API allows specifying the shard name or a {@code _route_} parameter. The Solr implementation will
- * convert either specification into the relevant shard name so the plugin code doesn't have to worry about this.
- */
- Set<String> getShardNames();
+ /**
+ * <p>Shard name(s) for which new replicas placement should be computed. The shard(s) might exist or not (that's why this
+ * method returns a {@link Set} of {@link String}'s and not directly a set of {@link Shard} instances).
+ *
+ * <p>Note the Collection API allows specifying the shard name or a {@code _route_} parameter. The Solr implementation will
+ * convert either specification into the relevant shard name so the plugin code doesn't have to worry about this.
+ */
+ Set<String> getShardNames();
- /**
- * <p>Replicas should only be placed on nodes in the set returned by this method.
- *
- * <p>When Collection API calls do not specify a specific set of target nodes, replicas can be placed on any live node of
- * the cluster. In such cases, this set will be equal to the set of all live nodes. The plugin placement code does not
- * need to worry (or care) if a set of nodes was explicitly specified or not.
- *
- * @return never {@code null} and never empty set (if that set was to be empty for any reason, no placement would be
- * possible and the Solr infrastructure driving the plugin code would detect the error itself rather than calling the plugin).
- */
- Set<Node> getTargetNodes();
+ /**
+ * <p>Replicas should only be placed on nodes in the set returned by this method.
+ *
+ * <p>When Collection API calls do not specify a specific set of target nodes, replicas can be placed on any live node of
+ * the cluster. In such cases, this set will be equal to the set of all live nodes. The plugin placement code does not
+ * need to worry (or care) if a set of nodes was explicitly specified or not.
+ *
+ * @return never {@code null} and never empty set (if that set was to be empty for any reason, no placement would be
+ * possible and the Solr infrastructure driving the plugin code would detect the error itself rather than calling the plugin).
+ */
+ Set<Node> getTargetNodes();
- /**
- * Returns the number of replica to create for the given replica type.
- */
- int getCountReplicasToCreate(Replica.ReplicaType replicaType);
+ /**
+ * Returns the number of replica to create for the given replica type.
+ */
+ int getCountReplicasToCreate(Replica.ReplicaType replicaType);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java
index 98367d3..3c3bf49 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java
@@ -35,194 +35,194 @@ import java.util.*;
import java.util.function.BiConsumer;
public class AttributeFetcherImpl implements AttributeFetcher {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- boolean requestedNodeCoreCount;
- boolean requestedNodeDiskType;
- boolean requestedNodeFreeDisk;
- boolean requestedNodeTotalDisk;
- boolean requestedNodeHeapUsage;
- boolean requestedNodeSystemLoadAverage;
- Set<String> requestedNodeSystemPropertiesSnitchTags = new HashSet<>();
- Set<String> requestedNodeMetricSnitchTags = new HashSet<>();
-
- Set<Node> nodes = Collections.emptySet();
-
- private final SolrCloudManager cloudManager;
-
- AttributeFetcherImpl(SolrCloudManager cloudManager) {
- this.cloudManager = cloudManager;
- }
-
- @Override
- public AttributeFetcher requestNodeCoreCount() {
- requestedNodeCoreCount = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeDiskType() {
- requestedNodeDiskType = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeFreeDisk() {
- requestedNodeFreeDisk = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeTotalDisk() {
- requestedNodeTotalDisk = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeHeapUsage() {
- requestedNodeHeapUsage = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeSystemLoadAverage() {
- requestedNodeSystemLoadAverage = true;
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeSystemProperty(String name) {
- requestedNodeSystemPropertiesSnitchTags.add(getSystemPropertySnitchTag(name));
- return this;
- }
-
- @Override
- public AttributeFetcher requestNodeEnvironmentVariable(String name) {
- throw new UnsupportedOperationException("Not yet implemented...");
- }
-
- @Override
- public AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry) {
- requestedNodeMetricSnitchTags.add(getMetricSnitchTag(metricName, registry));
- return this;
- }
-
- @Override
- public AttributeFetcher fetchFrom(Set<Node> nodes) {
- this.nodes = nodes;
- return this;
- }
-
- @Override
- public AttributeFetcher requestMetric(String scope, String metricName) {
- throw new UnsupportedOperationException("Not yet implemented...");
- }
-
- @Override
- public AttributeValues fetchAttributes() {
- // TODO Code here only supports node related attributes for now
-
- // Maps in which attribute values will be added
- Map<Node, Integer> nodeToCoreCount = Maps.newHashMap();
- Map<Node, DiskHardwareType> nodeToDiskType = Maps.newHashMap();
- Map<Node, Long> nodeToFreeDisk = Maps.newHashMap();
- Map<Node, Long> nodeToTotalDisk = Maps.newHashMap();
- Map<Node, Double> nodeToHeapUsage = Maps.newHashMap();
- Map<Node, Double> nodeToSystemLoadAverage = Maps.newHashMap();
- Map<String, Map<Node, String>> syspropSnitchToNodeToValue = Maps.newHashMap();
- Map<String, Map<Node, Double>> metricSnitchToNodeToValue = Maps.newHashMap();
-
- // In order to match the returned values for the various snitches, we need to keep track of where each
- // received value goes. Given the target maps are of different types (the maps from Node to whatever defined
- // above) we instead pass a function taking two arguments, the node and the (non null) returned value,
- // that will cast the value into the appropriate type for the snitch tag and insert it into the appropriate map
- // with the node as the key.
- Map<String, BiConsumer<Node, Object>> allSnitchTagsToInsertion = Maps.newHashMap();
- if (requestedNodeCoreCount) {
- allSnitchTagsToInsertion.put(ImplicitSnitch.CORES, (node, value) -> nodeToCoreCount.put(node, ((Number) value).intValue()));
- }
- if (requestedNodeDiskType) {
- allSnitchTagsToInsertion.put(ImplicitSnitch.DISKTYPE, (node, value) -> {
- if ("rotational".equals(value)) {
- nodeToDiskType.put(node, DiskHardwareType.ROTATIONAL);
- } else if ("ssd".equals(value)) {
- nodeToDiskType.put(node, DiskHardwareType.SSD);
- }
- // unknown disk type: insert no value, returned optional will be empty
- });
- }
- if (requestedNodeFreeDisk) {
- allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.FREEDISK.tagName,
- // Convert from bytes to GB
- (node, value) -> nodeToFreeDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024));
- }
- if (requestedNodeTotalDisk) {
- allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.TOTALDISK.tagName,
- // Convert from bytes to GB
- (node, value) -> nodeToTotalDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024));
- }
- if (requestedNodeHeapUsage) {
- allSnitchTagsToInsertion.put(ImplicitSnitch.HEAPUSAGE,
- (node, value) -> nodeToHeapUsage.put(node, ((Number) value).doubleValue()));
- }
- if (requestedNodeSystemLoadAverage) {
- allSnitchTagsToInsertion.put(ImplicitSnitch.SYSLOADAVG,
- (node, value) -> nodeToSystemLoadAverage.put(node, ((Number) value).doubleValue()));
- }
- for (String sysPropSnitch : requestedNodeSystemPropertiesSnitchTags) {
- final Map<Node, String> sysPropMap = Maps.newHashMap();
- syspropSnitchToNodeToValue.put(sysPropSnitch, sysPropMap);
- allSnitchTagsToInsertion.put(sysPropSnitch, (node, value) -> sysPropMap.put(node, (String) value));
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ boolean requestedNodeCoreCount;
+ boolean requestedNodeDiskType;
+ boolean requestedNodeFreeDisk;
+ boolean requestedNodeTotalDisk;
+ boolean requestedNodeHeapUsage;
+ boolean requestedNodeSystemLoadAverage;
+ Set<String> requestedNodeSystemPropertiesSnitchTags = new HashSet<>();
+ Set<String> requestedNodeMetricSnitchTags = new HashSet<>();
+
+ Set<Node> nodes = Collections.emptySet();
+
+ private final SolrCloudManager cloudManager;
+
+ AttributeFetcherImpl(SolrCloudManager cloudManager) {
+ this.cloudManager = cloudManager;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeCoreCount() {
+ requestedNodeCoreCount = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeDiskType() {
+ requestedNodeDiskType = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeFreeDisk() {
+ requestedNodeFreeDisk = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeTotalDisk() {
+ requestedNodeTotalDisk = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeHeapUsage() {
+ requestedNodeHeapUsage = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeSystemLoadAverage() {
+ requestedNodeSystemLoadAverage = true;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeSystemProperty(String name) {
+ requestedNodeSystemPropertiesSnitchTags.add(getSystemPropertySnitchTag(name));
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeEnvironmentVariable(String name) {
+ throw new UnsupportedOperationException("Not yet implemented...");
+ }
+
+ @Override
+ public AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry) {
+ requestedNodeMetricSnitchTags.add(getMetricSnitchTag(metricName, registry));
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher fetchFrom(Set<Node> nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestMetric(String scope, String metricName) {
+ throw new UnsupportedOperationException("Not yet implemented...");
+ }
+
+ @Override
+ public AttributeValues fetchAttributes() {
+ // TODO Code here only supports node related attributes for now
+
+ // Maps in which attribute values will be added
+ Map<Node, Integer> nodeToCoreCount = Maps.newHashMap();
+ Map<Node, DiskHardwareType> nodeToDiskType = Maps.newHashMap();
+ Map<Node, Long> nodeToFreeDisk = Maps.newHashMap();
+ Map<Node, Long> nodeToTotalDisk = Maps.newHashMap();
+ Map<Node, Double> nodeToHeapUsage = Maps.newHashMap();
+ Map<Node, Double> nodeToSystemLoadAverage = Maps.newHashMap();
+ Map<String, Map<Node, String>> syspropSnitchToNodeToValue = Maps.newHashMap();
+ Map<String, Map<Node, Double>> metricSnitchToNodeToValue = Maps.newHashMap();
+
+ // In order to match the returned values for the various snitches, we need to keep track of where each
+ // received value goes. Given the target maps are of different types (the maps from Node to whatever defined
+ // above) we instead pass a function taking two arguments, the node and the (non null) returned value,
+ // that will cast the value into the appropriate type for the snitch tag and insert it into the appropriate map
+ // with the node as the key.
+ Map<String, BiConsumer<Node, Object>> allSnitchTagsToInsertion = Maps.newHashMap();
+ if (requestedNodeCoreCount) {
+ allSnitchTagsToInsertion.put(ImplicitSnitch.CORES, (node, value) -> nodeToCoreCount.put(node, ((Number) value).intValue()));
+ }
+ if (requestedNodeDiskType) {
+ allSnitchTagsToInsertion.put(ImplicitSnitch.DISKTYPE, (node, value) -> {
+ if ("rotational".equals(value)) {
+ nodeToDiskType.put(node, DiskHardwareType.ROTATIONAL);
+ } else if ("ssd".equals(value)) {
+ nodeToDiskType.put(node, DiskHardwareType.SSD);
}
- for (String metricSnitch : requestedNodeMetricSnitchTags) {
- final Map<Node, Double> metricMap = Maps.newHashMap();
- metricSnitchToNodeToValue.put(metricSnitch, metricMap);
- allSnitchTagsToInsertion.put(metricSnitch, (node, value) -> metricMap.put(node, (Double) value));
+ // unknown disk type: insert no value, returned optional will be empty
+ });
+ }
+ if (requestedNodeFreeDisk) {
+ allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.FREEDISK.tagName,
+ // Convert from bytes to GB
+ (node, value) -> nodeToFreeDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024));
+ }
+ if (requestedNodeTotalDisk) {
+ allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.TOTALDISK.tagName,
+ // Convert from bytes to GB
+ (node, value) -> nodeToTotalDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024));
+ }
+ if (requestedNodeHeapUsage) {
+ allSnitchTagsToInsertion.put(ImplicitSnitch.HEAPUSAGE,
+ (node, value) -> nodeToHeapUsage.put(node, ((Number) value).doubleValue()));
+ }
+ if (requestedNodeSystemLoadAverage) {
+ allSnitchTagsToInsertion.put(ImplicitSnitch.SYSLOADAVG,
+ (node, value) -> nodeToSystemLoadAverage.put(node, ((Number) value).doubleValue()));
+ }
+ for (String sysPropSnitch : requestedNodeSystemPropertiesSnitchTags) {
+ final Map<Node, String> sysPropMap = Maps.newHashMap();
+ syspropSnitchToNodeToValue.put(sysPropSnitch, sysPropMap);
+ allSnitchTagsToInsertion.put(sysPropSnitch, (node, value) -> sysPropMap.put(node, (String) value));
+ }
+ for (String metricSnitch : requestedNodeMetricSnitchTags) {
+ final Map<Node, Double> metricMap = Maps.newHashMap();
+ metricSnitchToNodeToValue.put(metricSnitch, metricMap);
+ allSnitchTagsToInsertion.put(metricSnitch, (node, value) -> metricMap.put(node, (Double) value));
+ }
+
+ // Now that we know everything we need to fetch (and where to put it), just do it.
+ for (Node node : nodes) {
+ Map<String, Object> tagValues = cloudManager.getNodeStateProvider().getNodeValues(node.getName(), allSnitchTagsToInsertion.keySet());
+ for (Map.Entry<String, Object> e : tagValues.entrySet()) {
+ String tag = e.getKey();
+ Object value = e.getValue(); // returned value from the node
+
+ BiConsumer<Node, Object> inserter = allSnitchTagsToInsertion.get(tag);
+ // If inserter is null it's a return of a tag that we didn't request
+ if (inserter != null) {
+ inserter.accept(node, value);
+ } else {
+ log.error("Received unsolicited snitch tag {} from node {}", tag, node);
}
-
- // Now that we know everything we need to fetch (and where to put it), just do it.
- for (Node node : nodes) {
- Map<String, Object> tagValues = cloudManager.getNodeStateProvider().getNodeValues(node.getName(), allSnitchTagsToInsertion.keySet());
- for (Map.Entry<String, Object> e : tagValues.entrySet()) {
- String tag = e.getKey();
- Object value = e.getValue(); // returned value from the node
-
- BiConsumer<Node, Object> inserter = allSnitchTagsToInsertion.get(tag);
- // If inserter is null it's a return of a tag that we didn't request
- if (inserter != null) {
- inserter.accept(node, value);
- } else {
- log.error("Received unsolicited snitch tag {} from node {}", tag, node);
- }
- }
- }
-
- return new AttributeValuesImpl(nodeToCoreCount,
- nodeToDiskType,
- nodeToFreeDisk,
- nodeToTotalDisk,
- nodeToHeapUsage,
- nodeToSystemLoadAverage,
- syspropSnitchToNodeToValue,
- metricSnitchToNodeToValue);
- }
-
- private static SolrInfoBean.Group getGroupFromMetricRegistry(NodeMetricRegistry registry) {
- switch (registry) {
- case SOLR_JVM:
- return SolrInfoBean.Group.jvm;
- case SOLR_NODE:
- return SolrInfoBean.Group.node;
- default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported registry value " + registry);
- }
- }
-
- static String getMetricSnitchTag(String metricName, NodeMetricRegistry registry) {
- return SolrClientNodeStateProvider.METRICS_PREFIX + SolrMetricManager.getRegistryName(getGroupFromMetricRegistry(registry), metricName);
- }
-
- static String getSystemPropertySnitchTag(String name) {
- return ImplicitSnitch.SYSPROP + name;
- }
+ }
+ }
+
+ return new AttributeValuesImpl(nodeToCoreCount,
+ nodeToDiskType,
+ nodeToFreeDisk,
+ nodeToTotalDisk,
+ nodeToHeapUsage,
+ nodeToSystemLoadAverage,
+ syspropSnitchToNodeToValue,
+ metricSnitchToNodeToValue);
+ }
+
+ private static SolrInfoBean.Group getGroupFromMetricRegistry(NodeMetricRegistry registry) {
+ switch (registry) {
+ case SOLR_JVM:
+ return SolrInfoBean.Group.jvm;
+ case SOLR_NODE:
+ return SolrInfoBean.Group.node;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported registry value " + registry);
+ }
+ }
+
+ public static String getMetricSnitchTag(String metricName, NodeMetricRegistry registry) {
+ return SolrClientNodeStateProvider.METRICS_PREFIX + SolrMetricManager.getRegistryName(getGroupFromMetricRegistry(registry), metricName);
+ }
+
+ public static String getSystemPropertySnitchTag(String name) {
+ return ImplicitSnitch.SYSPROP + name;
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java
index 78c2143..ce68094 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java
@@ -25,90 +25,90 @@ import java.util.Map;
import java.util.Optional;
public class AttributeValuesImpl implements AttributeValues {
- final Map<Node, Integer> nodeToCoreCount;
- final Map<Node, AttributeFetcher.DiskHardwareType> nodeToDiskType;
- final Map<Node, Long> nodeToFreeDisk;
- final Map<Node, Long> nodeToTotalDisk;
- final Map<Node, Double> nodeToHeapUsage;
- final Map<Node, Double> nodeToSystemLoadAverage;
- final Map<String, Map<Node, String>> syspropSnitchToNodeToValue;
- final Map<String, Map<Node, Double>> metricSnitchToNodeToValue;
+ final Map<Node, Integer> nodeToCoreCount;
+ final Map<Node, AttributeFetcher.DiskHardwareType> nodeToDiskType;
+ final Map<Node, Long> nodeToFreeDisk;
+ final Map<Node, Long> nodeToTotalDisk;
+ final Map<Node, Double> nodeToHeapUsage;
+ final Map<Node, Double> nodeToSystemLoadAverage;
+ final Map<String, Map<Node, String>> syspropSnitchToNodeToValue;
+ final Map<String, Map<Node, Double>> metricSnitchToNodeToValue;
- AttributeValuesImpl(Map<Node, Integer> nodeToCoreCount,
- Map<Node, AttributeFetcher.DiskHardwareType> nodeToDiskType,
- Map<Node, Long> nodeToFreeDisk,
- Map<Node, Long> nodeToTotalDisk,
- Map<Node, Double> nodeToHeapUsage,
- Map<Node, Double> nodeToSystemLoadAverage,
- Map<String, Map<Node, String>> syspropSnitchToNodeToValue,
- Map<String, Map<Node, Double>> metricSnitchToNodeToValue) {
- this.nodeToCoreCount = nodeToCoreCount;
- this.nodeToDiskType = nodeToDiskType;
- this.nodeToFreeDisk = nodeToFreeDisk;
- this.nodeToTotalDisk = nodeToTotalDisk;
- this.nodeToHeapUsage = nodeToHeapUsage;
- this.nodeToSystemLoadAverage = nodeToSystemLoadAverage;
- this.syspropSnitchToNodeToValue = syspropSnitchToNodeToValue;
- this.metricSnitchToNodeToValue = metricSnitchToNodeToValue;
- }
+ public AttributeValuesImpl(Map<Node, Integer> nodeToCoreCount,
+ Map<Node, AttributeFetcher.DiskHardwareType> nodeToDiskType,
+ Map<Node, Long> nodeToFreeDisk,
+ Map<Node, Long> nodeToTotalDisk,
+ Map<Node, Double> nodeToHeapUsage,
+ Map<Node, Double> nodeToSystemLoadAverage,
+ Map<String, Map<Node, String>> syspropSnitchToNodeToValue,
+ Map<String, Map<Node, Double>> metricSnitchToNodeToValue) {
+ this.nodeToCoreCount = nodeToCoreCount;
+ this.nodeToDiskType = nodeToDiskType;
+ this.nodeToFreeDisk = nodeToFreeDisk;
+ this.nodeToTotalDisk = nodeToTotalDisk;
+ this.nodeToHeapUsage = nodeToHeapUsage;
+ this.nodeToSystemLoadAverage = nodeToSystemLoadAverage;
+ this.syspropSnitchToNodeToValue = syspropSnitchToNodeToValue;
+ this.metricSnitchToNodeToValue = metricSnitchToNodeToValue;
+ }
- @Override
- public Optional<Integer> getCoresCount(Node node) {
- return Optional.ofNullable(nodeToCoreCount.get(node));
- }
+ @Override
+ public Optional<Integer> getCoresCount(Node node) {
+ return Optional.ofNullable(nodeToCoreCount.get(node));
+ }
- @Override
- public Optional<AttributeFetcher.DiskHardwareType> getDiskType(Node node) {
- return Optional.ofNullable(nodeToDiskType.get(node));
- }
+ @Override
+ public Optional<AttributeFetcher.DiskHardwareType> getDiskType(Node node) {
+ return Optional.ofNullable(nodeToDiskType.get(node));
+ }
- @Override
- public Optional<Long> getFreeDisk(Node node) {
- return Optional.ofNullable(nodeToFreeDisk.get(node));
- }
+ @Override
+ public Optional<Long> getFreeDisk(Node node) {
+ return Optional.ofNullable(nodeToFreeDisk.get(node));
+ }
- @Override
- public Optional<Long> getTotalDisk(Node node) {
- return Optional.ofNullable(nodeToTotalDisk.get(node));
- }
+ @Override
+ public Optional<Long> getTotalDisk(Node node) {
+ return Optional.ofNullable(nodeToTotalDisk.get(node));
+ }
- @Override
- public Optional<Double> getHeapUsage(Node node) {
- return Optional.ofNullable(nodeToHeapUsage.get(node));
- }
+ @Override
+ public Optional<Double> getHeapUsage(Node node) {
+ return Optional.ofNullable(nodeToHeapUsage.get(node));
+ }
- @Override
- public Optional<Double> getSystemLoadAverage(Node node) {
- return Optional.ofNullable(nodeToSystemLoadAverage.get(node));
- }
+ @Override
+ public Optional<Double> getSystemLoadAverage(Node node) {
+ return Optional.ofNullable(nodeToSystemLoadAverage.get(node));
+ }
- @Override
- public Optional<String> getSystemProperty(Node node, String name) {
- Map<Node, String> nodeToValue = syspropSnitchToNodeToValue.get(AttributeFetcherImpl.getSystemPropertySnitchTag(name));
- if (nodeToValue == null) {
- return Optional.empty();
- }
- return Optional.ofNullable(nodeToValue.get(node));
+ @Override
+ public Optional<String> getSystemProperty(Node node, String name) {
+ Map<Node, String> nodeToValue = syspropSnitchToNodeToValue.get(AttributeFetcherImpl.getSystemPropertySnitchTag(name));
+ if (nodeToValue == null) {
+ return Optional.empty();
}
+ return Optional.ofNullable(nodeToValue.get(node));
+ }
- @Override
- public Optional<String> getEnvironmentVariable(Node node, String name) {
- // TODO implement
- return Optional.empty();
- }
+ @Override
+ public Optional<String> getEnvironmentVariable(Node node, String name) {
+ // TODO implement
+ return Optional.empty();
+ }
- @Override
- public Optional<Double> getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry) {
- Map<Node, Double> nodeToValue = metricSnitchToNodeToValue.get(AttributeFetcherImpl.getMetricSnitchTag(metricName, registry));
- if (nodeToValue == null) {
- return Optional.empty();
- }
- return Optional.ofNullable(nodeToValue.get(node));
+ @Override
+ public Optional<Double> getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry) {
+ Map<Node, Double> nodeToValue = metricSnitchToNodeToValue.get(AttributeFetcherImpl.getMetricSnitchTag(metricName, registry));
+ if (nodeToValue == null) {
+ return Optional.empty();
}
+ return Optional.ofNullable(nodeToValue.get(node));
+ }
- @Override
- public Optional<Double> getMetric(String scope, String metricName) {
- // TODO implement
- return Optional.empty();
- }
+ @Override
+ public Optional<Double> getMetric(String scope, String metricName) {
+ // TODO implement
+ return Optional.empty();
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java
index 3829372..7f7f89f 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java
@@ -24,14 +24,14 @@ import org.apache.solr.cluster.placement.*;
import java.util.Set;
-class PlacementPlanFactoryImpl implements PlacementPlanFactory {
- @Override
- public PlacementPlan createPlacementPlan(PlacementRequest request, Set<ReplicaPlacement> replicaPlacements) {
- return new PlacementPlanImpl(request, replicaPlacements);
- }
+public class PlacementPlanFactoryImpl implements PlacementPlanFactory {
+ @Override
+ public PlacementPlan createPlacementPlan(PlacementRequest request, Set<ReplicaPlacement> replicaPlacements) {
+ return new PlacementPlanImpl(request, replicaPlacements);
+ }
- @Override
- public ReplicaPlacement createReplicaPlacement(SolrCollection solrCollection, String shardName, Node node, Replica.ReplicaType replicaType) {
- return new ReplicaPlacementImpl(solrCollection, shardName, node, replicaType);
- }
+ @Override
+ public ReplicaPlacement createReplicaPlacement(SolrCollection solrCollection, String shardName, Node node, Replica.ReplicaType replicaType) {
+ return new ReplicaPlacementImpl(solrCollection, shardName, node, replicaType);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java
index 2dde07b..a8a65e0 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java
@@ -42,4 +42,14 @@ class PlacementPlanImpl implements PlacementPlan {
public Set<ReplicaPlacement> getReplicaPlacements() {
return replicaPlacements;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlacementPlan{");
+ for (ReplicaPlacement placement : replicaPlacements) {
+ sb.append("\n").append(placement.toString());
+ }
+ sb.append("\n}");
+ return sb.toString();
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
index 0bbf4e0..c4c5667 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.PlacementException;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPlan;
@@ -53,8 +54,9 @@ public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
throws Assign.AssignmentException, IOException, InterruptedException {
Cluster cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager);
+ SolrCollection solrCollection = new SimpleClusterAbstractionsImpl.SolrCollectionImpl(collection);
- PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(cluster, collection, assignRequest);
+ PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(cluster, solrCollection, assignRequest);
final PlacementPlan placementPlan;
try {
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java
index e6130a3..30cb6ef 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java
@@ -24,7 +24,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
-import org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement;
+import org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.Utils;
@@ -38,12 +38,6 @@ import org.apache.solr.common.util.Utils;
* {@link org.apache.solr.cloud.api.collections.Assign} class.</p>
*/
public class PlacementPluginConfigImpl implements PlacementPluginConfig {
- /**
- * The key in {@code clusterprops.json} under which the plugin factory and the plugin configuration are defined.
- */
- final public static String PLACEMENT_PLUGIN_CONFIG_KEY = "placement-plugin";
- /** Name of the property containing the factory class */
- final public static String CONFIG_CLASS = "class";
// Separating configs into typed maps based on the element names in solr.xml
private final Map<String, String> stringConfigs;
@@ -91,7 +85,7 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
@Override
public Long getLongConfig(String configName, long defaultValue) {
- Long retval = longConfigs.get(configName);
+ Long retval = longConfigs.get(configName);
return retval != null ? retval : defaultValue;
}
@@ -116,9 +110,9 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
* <p>Configuration properties {@code class} and {@code name} are reserved: for defining the plugin factory class and
* a human readable plugin name. All other properties are plugin specific.</p>
*
- * <p>See configuration example and how-to in {@link SamplePluginAffinityReplicaPlacement}.</p>
+ * <p>See configuration example and how-to in {@link AffinityPlacementFactory}.</p>
*/
- static PlacementPluginConfig createConfigFromProperties(Map<String, Object> pluginConfig) {
+ public static PlacementPluginConfig createConfigFromProperties(Map<String, Object> pluginConfig) {
final Map<String, String> stringConfigs = new HashMap<>();
final Map<String, Long> longConfigs = new HashMap<>();
final Map<String, Boolean> boolConfigs = new HashMap<>();
@@ -126,18 +120,18 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
for (Map.Entry<String, Object> e : pluginConfig.entrySet()) {
String key = e.getKey();
- if (CONFIG_CLASS.equals(key)) {
+ if (PlacementPluginConfig.FACTORY_CLASS.equals(key)) {
continue;
}
if (key == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config name attribute in parameter of " + PLACEMENT_PLUGIN_CONFIG_KEY);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config name attribute in parameter of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
}
Object value = e.getValue();
if (value == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config value for parameter " + key + " of " + PLACEMENT_PLUGIN_CONFIG_KEY);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config value for parameter " + key + " of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
}
if (value instanceof String) {
@@ -150,7 +144,7 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
doubleConfigs.put(key, (Double) value);
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported config type " + value.getClass().getName() +
- " for parameter " + key + " of " + PLACEMENT_PLUGIN_CONFIG_KEY);
+ " for parameter " + key + " of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
}
}
@@ -172,13 +166,13 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
@SuppressWarnings({"unchecked"})
public static PlacementPlugin getPlacementPlugin(SolrCloudManager solrCloudManager) {
Map<String, Object> props = solrCloudManager.getClusterStateProvider().getClusterProperties();
- Map<String, Object> pluginConfigMap = (Map<String, Object>) props.get(PLACEMENT_PLUGIN_CONFIG_KEY);
+ Map<String, Object> pluginConfigMap = (Map<String, Object>) props.get(PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
if (pluginConfigMap == null) {
return null;
}
- String pluginFactoryClassName = (String) pluginConfigMap.get(CONFIG_CLASS);
+ String pluginFactoryClassName = (String) pluginConfigMap.get(PlacementPluginConfig.FACTORY_CLASS);
// Get the configured plugin factory class. Is there a way to load a resource in Solr without being in the context of
// CoreContainer? Here the placement code is unrelated to the presence of cores (and one can imagine it running on
@@ -187,13 +181,13 @@ public class PlacementPluginConfigImpl implements PlacementPluginConfig {
PlacementPluginFactory placementPluginFactory;
try {
Class<? extends PlacementPluginFactory> factoryClazz =
- Class.forName(pluginFactoryClassName, true, PlacementPluginConfigImpl.class.getClassLoader())
- .asSubclass(PlacementPluginFactory.class);
+ Class.forName(pluginFactoryClassName, true, PlacementPluginConfigImpl.class.getClassLoader())
+ .asSubclass(PlacementPluginFactory.class);
placementPluginFactory = factoryClazz.getConstructor().newInstance(); // no args constructor - that's why we introduced a factory...
} catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to instantiate placement-plugin factory: " +
- Utils.toJSONString(pluginConfigMap) + " please review /clusterprops.json config for " + PLACEMENT_PLUGIN_CONFIG_KEY, e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to instantiate placement-plugin factory: " +
+ Utils.toJSONString(pluginConfigMap) + " please review /clusterprops.json config for " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY, e);
}
// Translate the config from the properties where they are defined into the abstraction seen by the plugin
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java
index 80cf6c5..ff3f090 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java
@@ -26,8 +26,7 @@ import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.SolrCollection;
-import org.apache.solr.cluster.placement.*;
-import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.cluster.placement.PlacementRequest;
public class PlacementRequestImpl implements PlacementRequest {
private final SolrCollection solrCollection;
@@ -35,9 +34,9 @@ public class PlacementRequestImpl implements PlacementRequest {
private final Set<Node> targetNodes;
private final EnumMap<Replica.ReplicaType, Integer> countReplicas = new EnumMap<>(Replica.ReplicaType.class);
- private PlacementRequestImpl(SolrCollection solrCollection,
- Set<String> shardNames, Set<Node> targetNodes,
- int countNrtReplicas, int countTlogReplicas, int countPullReplicas) {
+ public PlacementRequestImpl(SolrCollection solrCollection,
+ Set<String> shardNames, Set<Node> targetNodes,
+ int countNrtReplicas, int countTlogReplicas, int countPullReplicas) {
this.solrCollection = solrCollection;
this.shardNames = shardNames;
this.targetNodes = targetNodes;
@@ -72,12 +71,11 @@ public class PlacementRequestImpl implements PlacementRequest {
* Returns a {@link PlacementRequest} that can be consumed by a plugin based on an internal Assign.AssignRequest
* for adding replicas + additional info (upon creation of a new collection or adding replicas to an existing one).
*/
- static PlacementRequestImpl toPlacementRequest(Cluster cluster, DocCollection docCollection,
+ static PlacementRequestImpl toPlacementRequest(Cluster cluster, SolrCollection solrCollection,
Assign.AssignRequest assignRequest) throws Assign.AssignmentException {
- SolrCollection solrCollection = new SimpleClusterAbstractionsImpl.SolrCollectionImpl(docCollection);
Set<String> shardNames = new HashSet<>(assignRequest.shardNames);
if (shardNames.size() < 1) {
- throw new Assign.AssignmentException("Bad assign request: no shards specified for collection " + docCollection.getName());
+ throw new Assign.AssignmentException("Bad assign request: no shards specified for collection " + solrCollection.getName());
}
final Set<Node> nodes;
@@ -85,12 +83,12 @@ public class PlacementRequestImpl implements PlacementRequest {
if (assignRequest.nodes != null) {
nodes = SimpleClusterAbstractionsImpl.NodeImpl.getNodes(assignRequest.nodes);
if (nodes.isEmpty()) {
- throw new Assign.AssignmentException("Bad assign request: empty list of nodes for collection " + docCollection.getName());
+ throw new Assign.AssignmentException("Bad assign request: empty list of nodes for collection " + solrCollection.getName());
}
} else {
nodes = cluster.getLiveNodes();
if (nodes.isEmpty()) {
- throw new Assign.AssignmentException("Impossible assign request: no live nodes for collection " + docCollection.getName());
+ throw new Assign.AssignmentException("Impossible assign request: no live nodes for collection " + solrCollection.getName());
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java
index 0bf7564..69d9718 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java
@@ -60,6 +60,11 @@ class ReplicaPlacementImpl implements ReplicaPlacement {
return replicaType;
}
+ @Override
+ public String toString() {
+ return solrCollection.getName() + "/" + shardName + "/" + replicaType + "->" + node.getName();
+ }
+
/**
* Translates a set of {@link ReplicaPlacement} returned by a plugin into a list of {@link ReplicaPosition} expected
* by {@link org.apache.solr.cloud.api.collections.Assign.AssignStrategy}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
index 6ea2d24..e26a374 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java
@@ -112,9 +112,15 @@ class SimpleClusterAbstractionsImpl {
* with names equal to existing instances (See {@link ReplicaImpl} constructor).
*/
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
NodeImpl other = (NodeImpl) obj;
return Objects.equals(this.nodeName, other.nodeName);
}
@@ -127,7 +133,9 @@ class SimpleClusterAbstractionsImpl {
static class SolrCollectionImpl implements SolrCollection {
private final String collectionName;
- /** Map from {@link Shard#getShardName()} to {@link Shard} */
+ /**
+ * Map from {@link Shard#getShardName()} to {@link Shard}
+ */
private final Map<String, Shard> shards;
private final DocCollection docCollection;
@@ -167,6 +175,11 @@ class SimpleClusterAbstractionsImpl {
}
@Override
+ public Set<String> getShardNames() {
+ return shards.keySet();
+ }
+
+ @Override
public String getCustomProperty(String customPropertyName) {
return docCollection.getStr(customPropertyName);
}
@@ -207,12 +220,18 @@ class SimpleClusterAbstractionsImpl {
private ShardState translateState(Slice.State state) {
switch (state) {
- case ACTIVE: return ShardState.ACTIVE;
- case INACTIVE: return ShardState.INACTIVE;
- case CONSTRUCTION: return ShardState.CONSTRUCTION;
- case RECOVERY: return ShardState.RECOVERY;
- case RECOVERY_FAILED: return ShardState.RECOVERY_FAILED;
- default: throw new RuntimeException("Unexpected " + state);
+ case ACTIVE:
+ return ShardState.ACTIVE;
+ case INACTIVE:
+ return ShardState.INACTIVE;
+ case CONSTRUCTION:
+ return ShardState.CONSTRUCTION;
+ case RECOVERY:
+ return ShardState.RECOVERY;
+ case RECOVERY_FAILED:
+ return ShardState.RECOVERY_FAILED;
+ default:
+ throw new RuntimeException("Unexpected " + state);
}
}
@@ -253,15 +272,21 @@ class SimpleClusterAbstractionsImpl {
}
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
ShardImpl other = (ShardImpl) obj;
return Objects.equals(this.shardName, other.shardName)
- && Objects.equals(this.collection, other.collection)
- && Objects.equals(this.shardState, other.shardState)
- && Objects.equals(this.replicas, other.replicas)
- && Objects.equals(this.leader, other.leader);
+ && Objects.equals(this.collection, other.collection)
+ && Objects.equals(this.shardState, other.shardState)
+ && Objects.equals(this.replicas, other.replicas)
+ && Objects.equals(this.leader, other.leader);
}
public int hashCode() {
@@ -311,20 +336,29 @@ class SimpleClusterAbstractionsImpl {
private Replica.ReplicaType translateType(org.apache.solr.common.cloud.Replica.Type type) {
switch (type) {
- case NRT: return Replica.ReplicaType.NRT;
- case TLOG: return Replica.ReplicaType.TLOG;
- case PULL: return Replica.ReplicaType.PULL;
- default: throw new RuntimeException("Unexpected " + type);
+ case NRT:
+ return Replica.ReplicaType.NRT;
+ case TLOG:
+ return Replica.ReplicaType.TLOG;
+ case PULL:
+ return Replica.ReplicaType.PULL;
+ default:
+ throw new RuntimeException("Unexpected " + type);
}
}
private Replica.ReplicaState translateState(org.apache.solr.common.cloud.Replica.State state) {
switch (state) {
- case ACTIVE: return Replica.ReplicaState.ACTIVE;
- case DOWN: return Replica.ReplicaState.DOWN;
- case RECOVERING: return Replica.ReplicaState.RECOVERING;
- case RECOVERY_FAILED: return Replica.ReplicaState.RECOVERY_FAILED;
- default: throw new RuntimeException("Unexpected " + state);
+ case ACTIVE:
+ return Replica.ReplicaState.ACTIVE;
+ case DOWN:
+ return Replica.ReplicaState.DOWN;
+ case RECOVERING:
+ return Replica.ReplicaState.RECOVERING;
+ case RECOVERY_FAILED:
+ return Replica.ReplicaState.RECOVERY_FAILED;
+ default:
+ throw new RuntimeException("Unexpected " + state);
}
}
@@ -365,24 +399,34 @@ class SimpleClusterAbstractionsImpl {
*/
static org.apache.solr.common.cloud.Replica.Type toCloudReplicaType(ReplicaType type) {
switch (type) {
- case NRT: return org.apache.solr.common.cloud.Replica.Type.NRT;
- case TLOG: return org.apache.solr.common.cloud.Replica.Type.TLOG;
- case PULL: return org.apache.solr.common.cloud.Replica.Type.PULL;
- default: throw new IllegalArgumentException("Unknown " + type);
+ case NRT:
+ return org.apache.solr.common.cloud.Replica.Type.NRT;
+ case TLOG:
+ return org.apache.solr.common.cloud.Replica.Type.TLOG;
+ case PULL:
+ return org.apache.solr.common.cloud.Replica.Type.PULL;
+ default:
+ throw new IllegalArgumentException("Unknown " + type);
}
}
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
ReplicaImpl other = (ReplicaImpl) obj;
return Objects.equals(this.replicaName, other.replicaName)
- && Objects.equals(this.coreName, other.coreName)
- && Objects.equals(this.shard, other.shard)
- && Objects.equals(this.replicaType, other.replicaType)
- && Objects.equals(this.replicaState, other.replicaState)
- && Objects.equals(this.node, other.node);
+ && Objects.equals(this.coreName, other.coreName)
+ && Objects.equals(this.shard, other.shard)
+ && Objects.equals(this.replicaType, other.replicaType)
+ && Objects.equals(this.replicaState, other.replicaState)
+ && Objects.equals(this.node, other.node);
}
public int hashCode() {
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
new file mode 100644
index 0000000..06bdda7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.solr.cluster.*;
+import org.apache.solr.cluster.placement.*;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.SuppressForbidden;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * <p>This factory is instantiated by config from its class name. Using it is the only way to create instances of
+ * {@link AffinityPlacementPlugin}.</p>
+ *
+ * <p>In order to configure this plugin to be used for placement decisions, the following {@code curl} command (or something
+ * equivalent) has to be executed once the cluster is already running in order to set
+ * the appropriate Zookeeper stored configuration. Replace {@code localhost:8983} by one of your servers' IP address and port.</p>
+ *
+ * <pre>
+ *
+ * curl -X POST -H 'Content-type:application/json' -d '{
+ * "set-placement-plugin": {
+ * "class": "org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory",
+ * "minimalFreeDiskGB": 10,
+ * "prioritizedFreeDiskGB": 50
+ * }
+ * }' http://localhost:8983/api/cluster
+ * </pre>
+ *
+ * <p>The consequence will be the creation of an element in the Zookeeper file {@code /clusterprops.json} as follows:</p>
+ *
+ * <pre>
+ *
+ * "placement-plugin":{
+ * "class":"org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory",
+ * "minimalFreeDiskGB":10,
+ * "prioritizedFreeDiskGB":50}
+ * </pre>
+ *
+ * <p>In order to delete the placement-plugin section from {@code /clusterprops.json} (and to fallback to either Legacy
+ * or rule based placement if configured for a collection), execute:</p>
+ *
+ * <pre>
+ *
+ * curl -X POST -H 'Content-type:application/json' -d '{
+ * "set-placement-plugin" : null
+ * }' http://localhost:8983/api/cluster
+ * </pre>
+ *
+ *
+ * <p>{@link AffinityPlacementPlugin} implements placing replicas in a way that replicate past Autoscaling config defined
+ * <a href="https://github.com/lucidworks/fusion-cloud-native/blob/master/policy.json#L16">here</a>.</p>
+ *
+ * <p>This specification is doing the following:
+ * <p><i>Spread replicas per shard as evenly as possible across multiple availability zones (given by a sys prop),
+ * assign replicas based on replica type to specific kinds of nodes (another sys prop), and avoid having more than
+ * one replica per shard on the same node.<br>
+ * Only after these constraints are satisfied do minimize cores per node or disk usage.</i></p>
+ *
+ * <p>Overall strategy of this plugin:</p>
+ * <ul><li>
+ * The set of nodes in the cluster is obtained and transformed into 3 independent sets (that can overlap) of nodes
+ * accepting each of the three replica types.
+ * </li><li>
+ * For each shard on which placing replicas is required and then for each replica type to place (starting with NRT,
+ * then TLOG then PULL): <ul>
+ * <li>The set of candidates nodes corresponding to the replica type is used and from that set are removed nodes
+ * that already have a replica (of any type) for that shard</li>
+ * <li>If there are not enough nodes, an error is thrown (this is checked further down during processing).</li>
+ * <li>The number of (already existing) replicas of the current type on each Availability Zone is collected.</li>
+ * <li>Separate the set of available nodes to as many subsets (possibly some are empty) as there are Availability Zones
+ * defined for the candidate nodes</li>
+ * <li>In each AZ nodes subset, sort the nodes by increasing total number of cores count, with possibly a condition
+ * that pushes nodes with low disk space to the end of the list? Or a weighted combination of the relative
+ * importance of these two factors? Some randomization? Marking as non available nodes with not enough disk space?
+ * These and other are likely aspects to be played with once the plugin is tested or observed to be running in prod,
+ * don't expect the initial code drop(s) to do all of that.</li>
+ * <li>Iterate over the number of replicas to place (for the current replica type for the current shard):
+ * <ul>
+ * <li>Based on the number of replicas per AZ collected previously, pick the non empty set of nodes having the
+ * lowest number of replicas. Then pick the first node in that set. That's the node the replica is placed one.
+ * Remove the node from the set of available nodes for the given AZ and increase the number of replicas placed
+ * on that AZ.</li>
+ * </ul></li>
+ * <li>During this process, the number of cores on the nodes in general is tracked to take into account placement
+ * decisions so that not all shards decide to put their replicas on the same nodes (they might though if these are
+ * the less loaded nodes).</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>This code is a realistic placement computation, based on a few assumptions. The code is written in such a way to
+ * make it relatively easy to adapt it to (somewhat) different assumptions. Configuration options could be introduced
+ * to allow configuration base option selection as well...</p>
+ */
+public class AffinityPlacementFactory implements PlacementPluginFactory {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * <p>Name of the system property on a node indicating which (public cloud) Availability Zone that node is in. The value
+ * is any string, different strings denote different availability zones.
+ *
+ * <p>Nodes on which this system property is not defined are considered being in the same Availability Zone
+ * {@link #UNDEFINED_AVAILABILITY_ZONE} (hopefully the value of this constant is not the name of a real Availability Zone :).
+ */
+ public static final String AVAILABILITY_ZONE_SYSPROP = "availability_zone";
+
+ /**
+ * <p>Name of the system property on a node indicating the type of replicas allowed on that node.
+ * The value of that system property is a comma separated list or a single string of value names of
+ * {@link org.apache.solr.cluster.Replica.ReplicaType} (case insensitive). If that property is not defined, that node is
+ * considered accepting all replica types (i.e. undefined is equivalent to {@code "NRT,Pull,tlog"}).
+ */
+ public static final String REPLICA_TYPE_SYSPROP = "replica_type";
+
+ /**
+ * This is the "AZ" name for nodes that do not define an AZ. Should not match a real AZ name (I think we're safe)
+ */
+ public static final String UNDEFINED_AVAILABILITY_ZONE = "uNd3f1NeD";
+
+ /**
+ * If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions.
+ * Set to 0 or less to disable.
+ */
+ public static final String MINIMAL_FREE_DISK_GB = "minimalFreeDiskGB";
+
+ /**
+ * Replica allocation will assign replicas to nodes with at least this number of GB of free disk space regardless
+ * of the number of cores on these nodes rather than assigning replicas to nodes with less than this amount of free
+ * disk space if that's an option (if that's not an option, replicas can still be assigned to nodes with less than this
+ * amount of free space).
+ */
+ public static final String PRIORITIZED_FREE_DISK_GB = "prioritizedFreeDiskGB";
+
+ /**
+ * Empty public constructor is used to instantiate this factory. Using a factory pattern to allow the factory to do one
+ * time costly operations if needed, and to only have to instantiate a default constructor class by name, rather than
+ * having to call a constructor with more parameters (if we were to instantiate the plugin class directly without going
+ * through a factory).
+ */
+ public AffinityPlacementFactory() {
+ }
+
+ @Override
+ public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
+ final long minimalFreeDiskGB = config.getLongConfig(MINIMAL_FREE_DISK_GB, 20L);
+ final long prioritizedFreeDiskGB = config.getLongConfig(PRIORITIZED_FREE_DISK_GB, 100L);
+ return new AffinityPlacementPlugin(minimalFreeDiskGB, prioritizedFreeDiskGB);
+ }
+
+ /**
+ * See {@link AffinityPlacementFactory} for instructions on how to configure a cluster to use this plugin and details
+ * on what the plugin does.
+ */
+ static class AffinityPlacementPlugin implements PlacementPlugin {
+
+ private final long minimalFreeDiskGB;
+
+ private final long prioritizedFreeDiskGB;
+
+ private final Random replicaPlacementRandom = new Random(); // ok even if random sequence is predictable.
+
+ /**
+ * The factory has decoded the configuration for the plugin instance and passes it the parameters it needs.
+ */
+ private AffinityPlacementPlugin(long minimalFreeDiskGB, long prioritizedFreeDiskGB) {
+ this.minimalFreeDiskGB = minimalFreeDiskGB;
+ this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
+
+ // We make things reproducible in tests by using test seed if any
+ String seed = System.getProperty("tests.seed");
+ if (seed != null) {
+ replicaPlacementRandom.setSeed(seed.hashCode());
+ }
+ }
+
+ @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
+ public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
+ PlacementPlanFactory placementPlanFactory) throws PlacementException {
+ Set<Node> nodes = request.getTargetNodes();
+ SolrCollection solrCollection = request.getCollection();
+
+ // Request all needed attributes
+ attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP);
+ attributeFetcher.requestNodeCoreCount().requestNodeFreeDisk();
+ attributeFetcher.fetchFrom(nodes);
+ final AttributeValues attrValues = attributeFetcher.fetchAttributes();
+
+ // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types)
+ // These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node.
+ // Also get the number of currently existing cores per node, so we can keep update as we place new cores to not end up
+ // always selecting the same node(s).
+ Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> p = getNodesPerReplicaType(nodes, attrValues);
+
+ EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = p.first();
+ Map<Node, Integer> coresOnNodes = p.second();
+
+ // All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas
+ // being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones
+ // on which it is impossible to place replicas. That's ok.
+ Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
+
+ // Build the replica placement decisions here
+ Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
+
+ // Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas
+ for (String shardName : request.getShardNames()) {
+ // Inventory nodes (if any) that already have a replica of any type for the shard, because we can't be placing
+ // additional replicas on these. This data structure is updated after each replica to node assign and is used to
+ // make sure different replica types are not allocated to the same nodes (protecting same node assignments within
+ // a given replica type is done "by construction" in makePlacementDecisions()).
+ Set<Node> nodesWithReplicas = new HashSet<>();
+ Shard shard = solrCollection.getShard(shardName);
+ if (shard != null) {
+ for (Replica r : shard.replicas()) {
+ nodesWithReplicas.add(r.getNode());
+ }
+ }
+
+ // Iterate on the replica types in the enum order. We place more strategic replicas first
+ // (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less
+ // strategic replica placement impossibility is not a problem that should lead to replica placement computation
+ // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node).
+ for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+ makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType),
+ attrValues, replicaTypeToNodes, nodesWithReplicas, coresOnNodes, placementPlanFactory, replicaPlacements);
+ }
+ }
+
+ return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
+ }
+
+ private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
+ Set<String> azs = new HashSet<>();
+
+ for (Node n : nodes) {
+ azs.add(getNodeAZ(n, attrValues));
+ }
+
+ return Collections.unmodifiableSet(azs);
+ }
+
+ /**
+ * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property {@link #AVAILABILITY_ZONE_SYSPROP}
+ * to then return {@link #UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
+ */
+ private String getNodeAZ(Node n, final AttributeValues attrValues) {
+ Optional<String> nodeAz = attrValues.getSystemProperty(n, AVAILABILITY_ZONE_SYSPROP);
+ // All nodes with undefined AZ will be considered part of the same AZ. This also works for deployments that do not care about AZ's
+ return nodeAz.orElse(UNDEFINED_AVAILABILITY_ZONE);
+ }
+
+ /**
+ * This class captures an availability zone and the nodes that are legitimate targets for replica placement in that
+ * Availability Zone. Instances are used as values in a {@link java.util.TreeMap} in which the total number of already
+ * existing replicas in the AZ is the key. This allows easily picking the set of nodes from which to select a node for
+ * placement in order to balance the number of replicas per AZ. Picking one of the nodes from the set is done using
+ * different criteria unrelated to the Availability Zone (picking the node is based on the {@link CoresAndDiskComparator}
+ * ordering).
+ */
+ private static class AzWithNodes {
+ final String azName;
+ List<Node> availableNodesForPlacement;
+ boolean hasBeenSorted;
+
+ AzWithNodes(String azName, List<Node> availableNodesForPlacement) {
+ this.azName = azName;
+ this.availableNodesForPlacement = availableNodesForPlacement;
+ // Once the list is sorted to an order we're happy with, this flag is set to true to avoid sorting multiple times
+ // unnecessarily.
+ this.hasBeenSorted = false;
+ }
+ }
+
+ /**
+ * Given the set of all nodes on which to do placement and fetched attributes, builds the sets representing
+ * candidate nodes for placement of replicas of each replica type.
+ * These sets are packaged and returned in an EnumMap keyed by replica type (1st member of the Pair).
+ * Also builds the number of existing cores on each node present in the returned EnumMap (2nd member of the returned Pair).
+ * Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes
+ * as it would not be possible to make any meaningful placement decisions.
+ *
+ * @param nodes all nodes on which this plugin should compute placement
+ * @param attrValues attributes fetched for the nodes. This method uses system property {@link #REPLICA_TYPE_SYSPROP} as
+ * well as the number of cores on each node.
+ */
+ private Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> getNodesPerReplicaType(Set<Node> nodes, final AttributeValues attrValues) {
+ EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = new EnumMap<>(Replica.ReplicaType.class);
+ Map<Node, Integer> coresOnNodes = new HashMap<>();
+
+ for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+ replicaTypeToNodes.put(replicaType, new HashSet<>());
+ }
+
+ for (Node node : nodes) {
+ // Exclude nodes with unknown or too small disk free space
+ if (attrValues.getFreeDisk(node).isEmpty()) {
+ if (log.isWarnEnabled()) {
+ log.warn("Unknown free disk on node {}, excluding it from placement decisions.", node.getName());
+ }
+ // We rely later on the fact that the free disk optional is present (see CoresAndDiskComparator), be careful it you change anything here.
+ continue;
+ }
+ if (attrValues.getFreeDisk(node).get() < minimalFreeDiskGB) {
+ if (log.isWarnEnabled()) {
+ log.warn("Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.", node.getName(), attrValues.getFreeDisk(node).get(), minimalFreeDiskGB);
+ }
+ continue;
+ }
+
+ if (attrValues.getCoresCount(node).isEmpty()) {
+ if (log.isWarnEnabled()) {
+ log.warn("Unknown number of cores on node {}, excluding it from placement decisions.", node.getName());
+ }
+ // We rely later on the fact that the number of cores optional is present (see CoresAndDiskComparator), be careful it you change anything here.
+ continue;
+ }
+
+ Integer coresCount = attrValues.getCoresCount(node).get();
+ coresOnNodes.put(node, coresCount);
+
+ String supportedReplicaTypes = attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).isPresent() ? attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).get() : null;
+ // If property not defined or is only whitespace on a node, assuming node can take any replica type
+ if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
+ for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+ replicaTypeToNodes.get(rt).add(node);
+ }
+ } else {
+ Set<String> acceptedTypes = Arrays.stream(supportedReplicaTypes.split(",")).map(String::trim).map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+ for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+ if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
+ replicaTypeToNodes.get(rt).add(node);
+ }
+ }
+ }
+ }
+ return new Pair<>(replicaTypeToNodes, coresOnNodes);
+ }
+
+ /**
+ * <p>Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas.
+ *
+ * <p>The criteria used in this method are, in this order:
+ * <ol>
+ * <li>No more than one replica of a given shard on a given node (strictly enforced)</li>
+ * <li>Balance as much as possible replicas of a given {@link org.apache.solr.cluster.Replica.ReplicaType} over available AZ's.
+ * This balancing takes into account existing replicas <b>of the corresponding replica type</b>, if any.</li>
+ * <li>Place replicas if possible on nodes having more than a certain amount of free disk space (note that nodes with a too small
+ * amount of free disk space were eliminated as placement targets earlier, in {@link #getNodesPerReplicaType}). There's
+ * a threshold here rather than sorting on the amount of free disk space, because sorting on that value would in
+ * practice lead to never considering the number of cores on a node.</li>
+ * <li>Place replicas on nodes having a smaller number of cores (the number of cores considered
+ * for this decision includes previous placement decisions made during the processing of the placement request)</li>
+ * </ol>
+ */
+ @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
+ private void makePlacementDecisions(SolrCollection solrCollection, String shardName, Set<String> availabilityZones,
+ Replica.ReplicaType replicaType, int numReplicas, final AttributeValues attrValues,
+ EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes, Set<Node> nodesWithReplicas,
+ Map<Node, Integer> coresOnNodes, PlacementPlanFactory placementPlanFactory,
+ Set<ReplicaPlacement> replicaPlacements) throws PlacementException {
+ // Count existing replicas per AZ. We count only instances of the type of replica for which we need to do placement.
+ // If we ever want to balance replicas of any type across AZ's (and not each replica type balanced independently),
+ // we'd have to move this data structure to the caller of this method so it can be reused across different replica
+ // type placements for a given shard. Note then that this change would be risky. For example all NRT's and PULL
+ // replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up in the same AZ...
+ Map<String, Integer> azToNumReplicas = new HashMap<>();
+ for (String az : availabilityZones) {
+ azToNumReplicas.put(az, 0);
+ }
+
+ // Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica type
+ Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType));
+ // Remove nodes that already have a replica for the shard (no two replicas of same shard can be put on same node)
+ candidateNodes.removeAll(nodesWithReplicas);
+
+ Shard shard = solrCollection.getShard(shardName);
+ if (shard != null) {
+ // shard is non null if we're adding replicas to an already existing collection.
+ // If we're creating the collection, the shards do not exist yet.
+ for (Replica replica : shard.replicas()) {
+ // The node's AZ is counted as having a replica if it has a replica of the same type as the one we need
+ // to place here.
+ if (replica.getType() == replicaType) {
+ final String az = getNodeAZ(replica.getNode(), attrValues);
+ if (azToNumReplicas.containsKey(az)) {
+ // We do not count replicas on AZ's for which we don't have any node to place on because it would not help
+ // the placement decision. If we did want to do that, note the dereferencing below can't be assumed as the
+ // entry will not exist in the map.
+ azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
+ }
+ }
+ }
+ }
+
+ // We now have the set of real candidate nodes, we've enforced "No more than one replica of a given shard on a given node".
+ // We also counted for the shard and replica type under consideration how many replicas were per AZ, so we can place
+ // (or try to place) replicas on AZ's that have fewer replicas
+
+ // Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to placement candidates.
+ Map<String, List<Node>> nodesPerAz = new HashMap<>();
+ for (Node node : candidateNodes) {
+ String nodeAz = getNodeAZ(node, attrValues);
+ List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>());
+ nodesForAz.add(node);
+ }
+
+ // Build a treeMap sorted by the number of replicas per AZ and including candidates nodes suitable for placement on the
+ // AZ, so we can easily select the next AZ to get a replica assignment and quickly (constant time) decide if placement
+ // on this AZ is possible or not.
+ TreeMultimap<Integer, AzWithNodes> azByExistingReplicas = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary());
+ for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
+ azByExistingReplicas.put(azToNumReplicas.get(e.getKey()), new AzWithNodes(e.getKey(), e.getValue()));
+ }
+
+ CoresAndDiskComparator coresAndDiskComparator = new CoresAndDiskComparator(attrValues, coresOnNodes, prioritizedFreeDiskGB);
+
+ for (int i = 0; i < numReplicas; i++) {
+ // We have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas
+ // (candidate: does not already have a replica of this shard and is in the corresponding AZ).
+ // Among the AZ's with the minimal number of replicas of the given replica type for the shard, we must pick the AZ that
+ // offers the best placement (based on number of cores and free disk space). In order to do so, for these "minimal" AZ's
+ // we sort the nodes from best to worst placement candidate (based on the number of cores and free disk space) then pick
+ // the AZ that has the best best node. We don't sort all AZ's because that will not necessarily be needed.
+ int minNumberOfReplicasPerAz = 0; // This value never observed but compiler can't tell
+ Set<Map.Entry<Integer, AzWithNodes>> candidateAzEntries = null;
+ // Iterate over AZ's (in the order of increasing number of replicas on that AZ) and do two things: 1. remove those AZ's that
+ // have no nodes, no use iterating over these again and again (as we compute placement for more replicas), and 2. collect
+ // all those AZ with a minimal number of replicas.
+ for (Iterator<Map.Entry<Integer, AzWithNodes>> it = azByExistingReplicas.entries().iterator(); it.hasNext(); ) {
+ Map.Entry<Integer, AzWithNodes> entry = it.next();
+ int numberOfNodes = entry.getValue().availableNodesForPlacement.size();
+ if (numberOfNodes == 0) {
+ it.remove();
+ } else { // AZ does have node(s) for placement
+ if (candidateAzEntries == null) {
+ // First AZ with nodes that can take the replica. Initialize tracking structures
+ minNumberOfReplicasPerAz = numberOfNodes;
+ candidateAzEntries = new HashSet<>();
+ }
+ if (minNumberOfReplicasPerAz != numberOfNodes) {
+ // AZ's with more replicas than the minimum number seen are not placement candidates
+ break;
+ }
+ candidateAzEntries.add(entry);
+ // We remove all entries that are candidates: the "winner" will be modified, all entries might also be sorted,
+ // so we'll insert back the updated versions later.
+ it.remove();
+ }
+ }
+
+ if (candidateAzEntries == null) {
+ // This can happen because not enough nodes for the placement request or already too many nodes with replicas of
+ // the shard that can't accept new replicas or not enough nodes with enough free disk space.
+ throw new PlacementException("Not enough nodes to place " + numReplicas + " replica(s) of type " + replicaType +
+ " for shard " + shardName + " of collection " + solrCollection.getName());
+ }
+
+ // Iterate over all candidate AZ's, sort them if needed and find the best one to use for this placement
+ Map.Entry<Integer, AzWithNodes> selectedAz = null;
+ Node selectedAzBestNode = null;
+ for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : candidateAzEntries) {
+ AzWithNodes azWithNodes = candidateAzEntry.getValue();
+ List<Node> nodes = azWithNodes.availableNodesForPlacement;
+
+ if (!azWithNodes.hasBeenSorted) {
+ // Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this
+ // likely is not the case since after having added a replica to a node its number of cores increases for the next
+ // placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see
+ // the same initial cluster state, and we want placement to be reasonable even in that case without creating an
+ // unnecessary imbalance).
+ // For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node
+ // for placement, not always the same one due to some internal ordering.
+ Collections.shuffle(nodes, replicaPlacementRandom);
+
+ // Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list
+ nodes.sort(coresAndDiskComparator);
+
+ azWithNodes.hasBeenSorted = true;
+ }
+
+ // Which one is better, the new one or the previous best?
+ if (selectedAz == null || coresAndDiskComparator.compare(nodes.get(0), selectedAzBestNode) < 0) {
+ selectedAz = candidateAzEntry;
+ selectedAzBestNode = nodes.get(0);
+ }
+ }
+
+ // Now actually remove the selected node from the winning AZ
+ AzWithNodes azWithNodes = selectedAz.getValue();
+ List<Node> nodes = selectedAz.getValue().availableNodesForPlacement;
+ Node assignTarget = nodes.remove(0);
+
+ // Insert back all the qualifying but non winning AZ's removed while searching for the one
+ for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
+ if (removedAzs != selectedAz) {
+ azByExistingReplicas.put(removedAzs.getKey(), removedAzs.getValue());
+ }
+ }
+
+ // Insert back a corrected entry for the winning AZ: one more replica living there and one less node that can accept new replicas
+ // (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration).
+ azByExistingReplicas.put(selectedAz.getKey() + 1, azWithNodes);
+
+ // Do not assign that node again for replicas of other replica type for this shard
+ // (this update of the set is not useful in the current execution of this method but for following ones only)
+ nodesWithReplicas.add(assignTarget);
+
+ // Track that the node has one more core. These values are only used during the current run of the plugin.
+ coresOnNodes.merge(assignTarget, 1, Integer::sum);
+
+ // Register the replica assignment just decided
+ replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, assignTarget, replicaType));
+ }
+ }
+
+ /**
+ * Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas
+ * on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value).
+ */
+ static class CoresAndDiskComparator implements Comparator<Node> {
+ private final AttributeValues attrValues;
+ private final Map<Node, Integer> coresOnNodes;
+ private final long prioritizedFreeDiskGB;
+
+
+ /**
+ * The data we sort on is not part of the {@link Node} instances but has to be retrieved from the attributes and configuration.
+ * The number of cores per node is passed in a map whereas the free disk is fetched from the attributes due to the
+ * fact that we update the number of cores per node as we do allocations, but we do not update the free disk. The
+ * attrValues corresponding to the number of cores per node are the initial values, but we want to compare the actual
+ * value taking into account placement decisions already made during the current execution of the placement plugin.
+ */
+ CoresAndDiskComparator(AttributeValues attrValues, Map<Node, Integer> coresOnNodes, long prioritizedFreeDiskGB) {
+ this.attrValues = attrValues;
+ this.coresOnNodes = coresOnNodes;
+ this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
+ }
+
+ @Override
+ public int compare(Node a, Node b) {
+ // Note all nodes do have free disk defined. This has been verified earlier.
+ boolean aHasLowFreeSpace = attrValues.getFreeDisk(a).get() < prioritizedFreeDiskGB;
+ boolean bHasLowFreeSpace = attrValues.getFreeDisk(b).get() < prioritizedFreeDiskGB;
+ if (aHasLowFreeSpace != bHasLowFreeSpace) {
+ // A node with low free space should be considered > node with high free space since it needs to come later in sort order
+ return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace);
+ }
+ // The ordering on the number of cores is the natural order.
+ return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
+ }
+ }
+ }
+}
+
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
new file mode 100644
index 0000000..b73b692
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/MinimizeCoresPlacementFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Map;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.*;
+import org.apache.solr.common.util.SuppressForbidden;
+
+/**
+ * <p>Factory for creating {@link MinimizeCoresPlacementPlugin}, a Placement plugin implementing placing replicas
+ * to minimize number of cores per {@link Node}, while not placing two replicas of the same shard on the same node.
+ * This code is meant as an educational example of a placement plugin.</p>
+ *
+ * <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.</p>
+ */
+public class MinimizeCoresPlacementFactory implements PlacementPluginFactory {
+
+ @Override
+ public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
+ return new MinimizeCoresPlacementPlugin();
+ }
+
+ static private class MinimizeCoresPlacementPlugin implements PlacementPlugin {
+
+ @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
+ public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
+ PlacementPlanFactory placementPlanFactory) throws PlacementException {
+ int totalReplicasPerShard = 0;
+ for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+ totalReplicasPerShard += request.getCountReplicasToCreate(rt);
+ }
+
+ if (cluster.getLiveNodes().size() < totalReplicasPerShard) {
+ throw new PlacementException("Cluster size too small for number of replicas per shard");
+ }
+
+ // Get number of cores on each Node
+ TreeMultimap<Integer, Node> nodesByCores = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary());
+
+ Set<Node> nodes = request.getTargetNodes();
+
+ attributeFetcher.requestNodeCoreCount();
+ attributeFetcher.fetchFrom(nodes);
+ AttributeValues attrValues = attributeFetcher.fetchAttributes();
+
+
+ // Get the number of cores on each node and sort the nodes by increasing number of cores
+ for (Node node : nodes) {
+ if (attrValues.getCoresCount(node).isEmpty()) {
+ throw new PlacementException("Can't get number of cores in " + node);
+ }
+ nodesByCores.put(attrValues.getCoresCount(node).get(), node);
+ }
+
+ Set<ReplicaPlacement> replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
+
+ // Now place all replicas of all shards on nodes, by placing on nodes with the smallest number of cores and taking
+ // into account replicas placed during this computation. Note that for each shard we must place replicas on different
+ // nodes, when moving to the next shard we use the nodes sorted by their updated number of cores (due to replica
+ // placements for previous shards).
+ for (String shardName : request.getShardNames()) {
+ // Assign replicas based on the sort order of the nodesByCores tree multimap to put replicas on nodes with less
+ // cores first. We only need totalReplicasPerShard nodes given that's the number of replicas to place.
+ // We assign based on the passed nodeEntriesToAssign list so the right nodes get replicas.
+ ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign = new ArrayList<>(totalReplicasPerShard);
+ Iterator<Map.Entry<Integer, Node>> treeIterator = nodesByCores.entries().iterator();
+ for (int i = 0; i < totalReplicasPerShard; i++) {
+ nodeEntriesToAssign.add(treeIterator.next());
+ }
+
+ // Update the number of cores each node will have once the assignments below got executed so the next shard picks the
+ // lowest loaded nodes for its replicas.
+ for (Map.Entry<Integer, Node> e : nodeEntriesToAssign) {
+ int coreCount = e.getKey();
+ Node node = e.getValue();
+ nodesByCores.remove(coreCount, node);
+ nodesByCores.put(coreCount + 1, node);
+ }
+
+ for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+ placeReplicas(request.getCollection(), nodeEntriesToAssign, placementPlanFactory, replicaPlacements, shardName, request, replicaType);
+ }
+ }
+
+ return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
+ }
+
+ private void placeReplicas(SolrCollection solrCollection, ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign,
+ PlacementPlanFactory placementPlanFactory, Set<ReplicaPlacement> replicaPlacements,
+ String shardName, PlacementRequest request, Replica.ReplicaType replicaType) {
+ for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
+ final Map.Entry<Integer, Node> entry = nodeEntriesToAssign.remove(0);
+ final Node node = entry.getValue();
+
+ replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType));
+ }
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java
deleted file mode 100644
index d738fb8..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java
+++ /dev/null
@@ -1,509 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cluster.placement.plugins;
-
-import com.google.common.collect.*;
-import org.apache.solr.cluster.*;
-import org.apache.solr.cluster.placement.*;
-import org.apache.solr.common.util.Pair;
-import org.apache.solr.common.util.SuppressForbidden;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * <p>Implements placing replicas in a way that replicate past Autoscaling config defined
- * <a href="https://github.com/lucidworks/fusion-cloud-native/blob/master/policy.json#L16">here</a>.</p>
- *
- * <p>This specification is doing the following:
- * <p><i>Spread replicas per shard as evenly as possible across multiple availability zones (given by a sys prop),
- * assign replicas based on replica type to specific kinds of nodes (another sys prop), and avoid having more than
- * one replica per shard on the same node.<br>
- * Only after these constraints are satisfied do minimize cores per node or disk usage.</i></p>
- *
- * <p>Overall strategy of this plugin:</p>
- * <ul><li>
- * The set of nodes in the cluster is obtained and transformed into 3 independent sets (that can overlap) of nodes
- * accepting each of the three replica types.
- * </li><li>
- * For each shard on which placing replicas is required and then for each replica type to place (starting with NRT, then TLOG then PULL): <ul>
- * <li>The set of candidates nodes corresponding to the replica type is used and from that set are removed nodes
- * that already have a replica (of any type) for that shard</li>
- * <li>If there are not enough nodes, an error is thrown (this is checked further down during processing).</li>
- * <li>The number of (already existing) replicas of the current type on each Availability Zone is collected.</li>
- * <li>Separate the set of available nodes to as many subsets (possibly some are empty) as there are Availability Zones
- * defined for the candidate nodes</li>
- * <li>In each AZ nodes subset, sort the nodes by increasing total number of cores count, with possibly a condition
- * that pushes nodes with low disk space to the end of the list? Or a weighted combination of the relative
- * importance of these two factors? Some randomization? Marking as non available nodes with not enough disk space?
- * These and other are likely aspects to be played with once the plugin is tested or observed to be running in prod,
- * don't expect the initial code drop(s) to do all of that.</li>
- * <li>Iterate over the number of replicas to place (for the current replica type for the current shard):
- * <ul>
- * <li>Based on the number of replicas per AZ collected previously, pick the non empty set of nodes having the
- * lowest number of replicas. Then pick the first node in that set. That's the node the replica is placed one.
- * Remove the node from the set of available nodes for the given AZ and increase the number of replicas placed
- * on that AZ.</li>
- * </ul></li>
- * <li>During this process, the number of cores on the nodes in general is tracked to take into account placement
- * decisions so that not all shards decide to put their replicas on the same nodes (they might though if these are
- * the less loaded nodes).</li>
- * </ul>
- * </li>
- * </ul>
- *
- * <p>This code is a realistic placement computation, based on a few assumptions. The code is written in such a way to
- * make it relatively easy to adapt it to (somewhat) different assumptions. Configuration options could be introduced
- * to allow configuration base option selection as well...</p>
- *
- * <p>In order to configure this plugin to be used for placement decisions, the following {@code curl} command (or something
- * equivalent) has to be executed once the cluster is already running in order to set
- * the appropriate Zookeeper stored configuration. Replace {@code localhost:8983} by one of your servers' IP address and port.</p>
- *
- * <pre>
- *
- curl -X POST -H 'Content-type:application/json' -d '{
- "set-placement-plugin": {
- "class": "org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement$Factory",
- "minimalFreeDiskGB": 10,
- "deprioritizedFreeDiskGB": 50
- }
- }' http://localhost:8983/api/cluster
- * </pre>
- *
- * <p>The consequence will be the creation of an element in the Zookeeper file {@code /clusterprops.json} as follows:</p>
- *
- * <pre>
- *
- * "placement-plugin":{
- * "class":"org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement$Factory",
- * "minimalFreeDiskGB":10,
- * "deprioritizedFreeDiskGB":50}
- * </pre>
- *
- * <p>In order to delete the placement-plugin section from {@code /clusterprops.json} (and to fallback to either Legacy
- * or rule based placement if configured for a collection), execute:</p>
- *
- * <pre>
- *
- curl -X POST -H 'Content-type:application/json' -d '{
- "set-placement-plugin" : null
- }' http://localhost:8983/api/cluster
- * </pre>
- */
-public class SamplePluginAffinityReplicaPlacement implements PlacementPlugin {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- /**
- * This factory is instantiated by config from its class name. Using it is the only way to create instances of
- * {@link SamplePluginAffinityReplicaPlacement}.
- */
- static public class Factory implements PlacementPluginFactory {
-
- /**
- * Empty public constructor is used to instantiate this factory. Using a factory pattern to allow the factory to do one
- * time costly operations if needed, and to only have to instantiate a default constructor class by name, rather than
- * having to call a constructor with more parameters (if we were to instantiate the plugin class directly without going
- * through a factory).
- */
- public Factory() {
- }
-
- @Override
- public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
- final long minimalFreeDiskGB = config.getLongConfig("minimalFreeDiskGB", 20L);
- final long deprioritizedFreeDiskGB = config.getLongConfig("deprioritizedFreeDiskGB", 100L);
- return new SamplePluginAffinityReplicaPlacement(minimalFreeDiskGB, deprioritizedFreeDiskGB);
- }
- }
-
-
- /**
- * <p>Name of the system property on a node indicating which (public cloud) Availability Zone that node is in. The value
- * is any string, different strings denote different availability zones.
- *
- * <p>Nodes on which this system property is not defined are considered being in the same Availability Zone
- * {@link #UNDEFINED_AVAILABILITY_ZONE} (hopefully the value of this constant is not the name of a real Availability Zone :).
- */
- public static final String AVAILABILITY_ZONE_SYSPROP = "availability_zone";
- /** This is the "AZ" name for nodes that do not define an AZ. Should not match a real AZ name (I think we're safe) */
- public static final String UNDEFINED_AVAILABILITY_ZONE = "uNd3f1NeD";
-
- /**
- * <p>Name of the system property on a node indicating the type of replicas allowed on that node.
- * The value of that system property is a comma separated list or a single string of value names of
- * {@link org.apache.solr.cluster.Replica.ReplicaType} (case insensitive). If that property is not defined, that node is
- * considered accepting all replica types (i.e. undefined is equivalent to {@code "NRT,Pull,tlog"}).
- *
- * <p>See {@link #getNodesPerReplicaType}.
- */
- public static final String REPLICA_TYPE_SYSPROP = "replica_type";
-
- /**
- * If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions.
- * Set to 0 or less to disable.
- */
- private final long minimalFreeDiskGB;
-
- /**
- * Replica allocation will assign replicas to nodes with at least this number of GB of free disk space regardless
- * of the number of cores on these nodes rather than assigning replicas to nodes with less than this amount of free
- * disk space if that's an option (if that's not an option, replicas can still be assigned to nodes with less than this
- * amount of free space).
- */
- private final long deprioritizedFreeDiskGB;
-
- /**
- * The factory has decoded the configuration for the plugin instance and passes it the parameters it needs.
- */
- private SamplePluginAffinityReplicaPlacement(long minimalFreeDiskGB, long deprioritizedFreeDiskGB) {
- this.minimalFreeDiskGB = minimalFreeDiskGB;
- this.deprioritizedFreeDiskGB = deprioritizedFreeDiskGB;
- }
-
- @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
- PlacementPlanFactory placementPlanFactory) throws PlacementException {
- Set<Node> nodes = request.getTargetNodes();
- SolrCollection solrCollection = request.getCollection();
-
- // Request all needed attributes
- attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP);
- attributeFetcher.requestNodeCoreCount().requestNodeFreeDisk();
- attributeFetcher.fetchFrom(nodes);
- final AttributeValues attrValues = attributeFetcher.fetchAttributes();
-
- // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types)
- // These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node.
- // Also get the number of currently existing cores per node, so we can keep update as we place new cores to not end up
- // always selecting the same node(s).
- Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> p = getNodesPerReplicaType(nodes, attrValues);
-
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = p.first();
- Map<Node, Integer> coresOnNodes = p.second();
-
- // All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas
- // being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones
- // on which it is impossible to place replicas. That's ok.
- ImmutableSet<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
-
- // Build the replica placement decisions here
- Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
-
- // Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas
- for (String shardName : request.getShardNames()) {
- // Iterate on the replica types in the enum order. We place more strategic replicas first
- // (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less
- // strategic replica placement impossibility is not a problem that should lead to replica placement computation
- // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node).
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType),
- attrValues, replicaTypeToNodes, coresOnNodes, placementPlanFactory, replicaPlacements);
- }
- }
-
- return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
- }
-
- private ImmutableSet<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
- Set<String> azs = new HashSet<>();
-
- for (Node n : nodes) {
- azs.add(getNodeAZ(n, attrValues));
- }
-
- return ImmutableSet.copyOf(azs);
- }
-
- /**
- * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property {@link #AVAILABILITY_ZONE_SYSPROP}
- * to then return {@link #UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
- */
- private String getNodeAZ(Node n, final AttributeValues attrValues) {
- Optional<String> nodeAz = attrValues.getSystemProperty(n, AVAILABILITY_ZONE_SYSPROP);
- // All nodes with undefined AZ will be considered part of the same AZ. This also works for deployments that do not care about AZ's
- return nodeAz.orElse(UNDEFINED_AVAILABILITY_ZONE);
- }
-
- /**
- * This class captures an availability zone and the nodes that are legitimate targets for replica placement in that
- * Availability Zone. Instances are used as values in a {@link TreeMap} in which the total number of already
- * existing replicas in the AZ is the key. This allows easily picking the set of nodes from which to select a node for
- * placement in order to balance the number of replicas per AZ. Picking one of the nodes from the set is done using
- * different criteria unrelated to the Availability Zone (picking the node is based on the {@link CoresAndDiskComparator}
- * ordering).
- */
- private static class AzWithNodes {
- final String azName;
- List<Node> availableNodesForPlacement;
- boolean hasBeenSorted;
-
- AzWithNodes(String azName, List<Node> availableNodesForPlacement) {
- this.azName = azName;
- this.availableNodesForPlacement = availableNodesForPlacement;
- // Once the list is sorted to an order we're happy with, this flag is set to true to avoid sorting multiple times
- // unnecessarily.
- this.hasBeenSorted = false;
- }
- }
-
- /**
- * Given the set of all nodes on which to do placement and fetched attributes, builds the sets representing
- * candidate nodes for placement of replicas of each replica type.
- * These sets are packaged and returned in an EnumMap keyed by replica type (1st member of the Pair).
- * Also builds the number of existing cores on each node present in the returned EnumMap (2nd member of the returned Pair).
- * Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes
- * as it would not be possible to make any meaningful placement decisions.
- * @param nodes all nodes on which this plugin should compute placement
- * @param attrValues attributes fetched for the nodes. This method uses system property {@link #REPLICA_TYPE_SYSPROP} as
- * well as the number of cores on each node.
- */
- private Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> getNodesPerReplicaType(Set<Node> nodes, final AttributeValues attrValues) {
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = new EnumMap<>(Replica.ReplicaType.class);
- Map<Node, Integer> coresOnNodes = Maps.newHashMap();
-
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- replicaTypeToNodes.put(replicaType, new HashSet<>());
- }
-
- for (Node node : nodes) {
- // Exclude nodes with unknown or too small disk free space
- if (attrValues.getFreeDisk(node).isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("Unknown free disk on node {}, excluding it from placement decisions.", node.getName());
- }
- // We rely later on the fact that the free disk optional is present (see CoresAndDiskComparator), be careful it you change anything here.
- continue;
- } if (attrValues.getFreeDisk(node).get() < minimalFreeDiskGB) {
- if (log.isWarnEnabled()) {
- log.warn("Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.", node.getName(), attrValues.getFreeDisk(node).get(), minimalFreeDiskGB);
- }
- continue;
- }
-
- if (attrValues.getCoresCount(node).isEmpty()) {
- if (log.isWarnEnabled()) {
- log.warn("Unknown number of cores on node {}, excluding it from placement decisions.", node.getName());
- }
- // We rely later on the fact that the number of cores optional is present (see CoresAndDiskComparator), be careful it you change anything here.
- continue;
- }
-
- Integer coresCount = attrValues.getCoresCount(node).get();
- coresOnNodes.put(node, coresCount);
-
- String supportedReplicaTypes = attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).isPresent() ? attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).get() : null;
- // If property not defined or is only whitespace on a node, assuming node can take any replica type
- if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- replicaTypeToNodes.get(rt).add(node);
- }
- } else {
- Set<String> acceptedTypes = Arrays.stream(supportedReplicaTypes.split(",")).map(String::trim).map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
- replicaTypeToNodes.get(rt).add(node);
- }
- }
- }
- }
- return new Pair<>(replicaTypeToNodes, coresOnNodes);
- }
-
- /**
- * <p>Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas.
- *
- * <p>The criteria used in this method are, in this order:
- * <ol>
- * <li>No more than one replica of a given shard on a given node (strictly enforced)</li>
- * <li>Balance as much as possible the number of replicas of the given {@link org.apache.solr.cluster.Replica.ReplicaType} over available AZ's.
- * This balancing takes into account existing replicas <b>of the corresponding replica type</b>, if any.</li>
- * <li>Place replicas is possible on nodes having more than a certain amount of free disk space (note that nodes with a too small
- * amount of free disk space were eliminated as placement targets earlier, in {@link #getNodesPerReplicaType}). There's
- * a threshold here rather than sorting on the amount of free disk space, because sorting on that value would in
- * practice lead to never considering the number of cores on a node.</li>
- * <li>Place replicas on nodes having a smaller number of cores (the number of cores considered
- * for this decision includes decisions made during the processing of the placement request)</li>
- * </ol>
- */
- @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- private void makePlacementDecisions(SolrCollection solrCollection, String shardName, ImmutableSet<String> availabilityZones,
- Replica.ReplicaType replicaType, int numReplicas, final AttributeValues attrValues,
- EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes, Map<Node, Integer> coresOnNodes,
- PlacementPlanFactory placementPlanFactory, Set<ReplicaPlacement> replicaPlacements) throws PlacementException {
- // Build the set of candidate nodes, i.e. nodes not having (yet) a replica of the given shard
- Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType));
-
- // Count existing replicas per AZ. We count only instances the type of replica for which we need to do placement. This
- // can be changed in the loop below if we want to count all replicas for the shard.
- Map<String, Integer> azToNumReplicas = Maps.newHashMap();
- // Add all "interesting" AZ's, i.e. AZ's for which there's a chance we can do placement.
- for (String az : availabilityZones) {
- azToNumReplicas.put(az, 0);
- }
-
- Shard shard = solrCollection.getShard(shardName);
- if (shard != null) {
- // shard is non null if we're adding replicas to an already existing collection.
- // If we're creating the collection, the shards do not exist yet.
- for (Replica replica : shard.replicas()) {
- // Nodes already having any type of replica for the shard can't get another replica.
- candidateNodes.remove(replica.getNode());
- // The node's AZ has to be counted as having a replica if it has a replica of the same type as the one we need
- // to place here (remove the "if" below to balance the number of replicas per AZ across all replica types rather
- // than within each replica type, but then there's a risk that all NRT replicas for example end up on the same AZ).
- // Note that if in the cluster nodes are configured to accept a single replica type and not multiple ones, the
- // two options are equivalent (governed by system property AVAILABILITY_ZONE_SYSPROP on each node)
- if (replica.getType() == replicaType) {
- final String az = getNodeAZ(replica.getNode(), attrValues);
- if (azToNumReplicas.containsKey(az)) {
- // We do not count replicas on AZ's for which we don't have any node to place on because it would not help
- // the placement decision. If we did want to do that, note the dereferencing below can't be assumed as the
- // entry will not exist in the map.
- azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
- }
- }
- }
- }
-
- // We now have the set of real candidate nodes, we've enforced "No more than one replica of a given shard on a given node".
- // We also counted for the shard and replica type under consideration how many replicas were per AZ, so we can place
- // (or try to place) replicas on AZ's that have fewer replicas
-
- // Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to placement candidates.
- Map<String, List<Node>> nodesPerAz = Maps.newHashMap();
- for (Node node : candidateNodes) {
- String nodeAz = getNodeAZ(node, attrValues);
- List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>());
- nodesForAz.add(node);
- }
-
- // Build a treeMap sorted by the number of replicas per AZ and including candidates nodes suitable for placement on the
- // AZ, so we can easily select the next AZ to get a replica assignment and quickly (constant time) decide if placement
- // on this AZ is possible or not.
- TreeMultimap<Integer, AzWithNodes> azByExistingReplicas = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary());
- for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
- azByExistingReplicas.put(azToNumReplicas.get(e.getKey()), new AzWithNodes(e.getKey(), e.getValue()));
- }
-
- CoresAndDiskComparator coresAndDiskComparator = new CoresAndDiskComparator(attrValues, coresOnNodes, deprioritizedFreeDiskGB);
-
- // Now we have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas
- // (candidate: does not already have a replica of this shard and is in the corresponding AZ).
- // We must now select those of the nodes on which we actually place the replicas, and will do that based on the total
- // number of cores already present on these nodes as well as the free disk space.
- // We sort once by the order related to number of cores and disk space each list of nodes on an AZ. We do not sort all
- // of them ahead of time because we might be placing a small number of replicas and it might be wasted work.
- for (int i = 0; i < numReplicas; i++) {
- // Pick the AZ having the lowest number of replicas for this shard, and if that AZ has available nodes, pick the
- // most appropriate one (based on number of cores and disk space constraints). In the process, remove entries (AZ's)
- // that do not have nodes to place replicas on because these are useless to us.
- Map.Entry<Integer, AzWithNodes> azWithNodesEntry = null;
- for (Iterator<Map.Entry<Integer, AzWithNodes>> it = azByExistingReplicas.entries().iterator(); it.hasNext(); ) {
- Map.Entry<Integer, AzWithNodes> entry = it.next();
- if (!entry.getValue().availableNodesForPlacement.isEmpty()) {
- azWithNodesEntry = entry;
- // Remove this entry. Will add it back after a node has been removed from the list of available nodes and the number
- // of replicas on the AZ has been increased by one (search for "azByExistingReplicas.put" below).
- it.remove();
- break;
- } else {
- it.remove();
- }
- }
-
- if (azWithNodesEntry == null) {
- // This can happen because not enough nodes for the placement request or already too many nodes with replicas of
- // the shard that can't accept new replicas or not enough nodes with enough free disk space.
- throw new PlacementException("Not enough nodes to place " + numReplicas + " replica(s) of type " + replicaType +
- " for shard " + shardName + " of collection " + solrCollection.getName());
- }
-
- AzWithNodes azWithNodes = azWithNodesEntry.getValue();
- List<Node> nodes = azWithNodes.availableNodesForPlacement;
-
- if (!azWithNodes.hasBeenSorted) {
- // Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this
- // likely is not the case since after having added a replica to a node its number of cores increases for the next
- // placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see
- // the same initial cluster state, and we want placement to be reasonable even in that case without creating an
- // unnecessary imbalance).
- // For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node
- // for placement, not always the same one due to some internal ordering.
- Collections.shuffle(nodes, new Random());
-
- // Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list
- nodes.sort(coresAndDiskComparator);
-
- azWithNodes.hasBeenSorted = true;
- }
-
- Node assignTarget = nodes.remove(0);
-
- // Insert back a corrected entry for the AZ: one more replica living there and one less node that can accept new replicas
- // (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration).
- azByExistingReplicas.put(azWithNodesEntry.getKey() + 1, azWithNodes);
-
- // Track that the node has one more core. These values are only used during the current run of the plugin.
- coresOnNodes.merge(assignTarget, 1, Integer::sum);
-
- // Register the replica assignment just decided
- replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, assignTarget, replicaType));
- }
- }
-
- /**
- * Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas
- * on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value).
- */
- static class CoresAndDiskComparator implements Comparator<Node> {
- private final AttributeValues attrValues;
- private final Map<Node, Integer> coresOnNodes;
- private final long deprioritizedFreeDiskGB;
-
-
- /**
- * The data we sort on is not part of the {@link Node} instances but has to be retrieved from the attributes and configuration.
- * The number of cores per node is passed in a map whereas the free disk is fetched from the attributes due to the
- * fact that we update the number of cores per node as we do allocations, but we do not update the free disk. The
- * attrValues correpsonding to the number of cores per node are the initial values, but we want to comapre the actual
- * value taking into account placement decisions already made during the current execution of the placement plugin.
- */
- CoresAndDiskComparator(AttributeValues attrValues, Map<Node, Integer> coresOnNodes, long deprioritizedFreeDiskGB) {
- this.attrValues = attrValues;
- this.coresOnNodes = coresOnNodes;
- this.deprioritizedFreeDiskGB = deprioritizedFreeDiskGB;
- }
-
- @Override
- public int compare(Node a, Node b) {
- // Note all nodes do have free disk defined. This has been verified earlier.
- boolean aHasLowFreeSpace = attrValues.getFreeDisk(a).get() < deprioritizedFreeDiskGB;
- boolean bHasLowFreeSpace = attrValues.getFreeDisk(b).get() < deprioritizedFreeDiskGB;
- if (aHasLowFreeSpace != bHasLowFreeSpace) {
- // A node with low free space should be considered > node with high free space since it needs to come later in sort order
- return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace);
- }
- // The ordering on the number of cores is the natural order.
- return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
- }
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java
deleted file mode 100644
index 54520fc..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cluster.placement.plugins;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Map;
-
-import com.google.common.collect.Ordering;
-import com.google.common.collect.TreeMultimap;
-import org.apache.solr.cluster.Cluster;
-import org.apache.solr.cluster.Node;
-import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.SolrCollection;
-import org.apache.solr.cluster.placement.*;
-import org.apache.solr.common.util.SuppressForbidden;
-
-/**
- * <p>Implements placing replicas to minimize number of cores per {@link Node}, while not placing two replicas of the same
- * shard on the same node.</p>
- *
- * <p>Warning: not really tested. See {@link SamplePluginAffinityReplicaPlacement} for a more realistic example.</p>
- */
-public class SamplePluginMinimizeCores implements PlacementPlugin {
-
- private final PlacementPluginConfig config;
-
- private SamplePluginMinimizeCores(PlacementPluginConfig config) {
- this.config = config;
- }
-
- static public class Factory implements PlacementPluginFactory {
-
- /**
- * Empty public constructor is used to instantiate this factory based on configuration in solr.xml, element
- * {@code <placementPluginFactory>} in element {@code <solrcloud>}.
- */
- public Factory() {
- }
-
- @Override
- public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
- return new SamplePluginMinimizeCores(config);
- }
- }
-
- @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
- public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
- PlacementPlanFactory placementPlanFactory) throws PlacementException {
- int totalReplicasPerShard = 0;
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- totalReplicasPerShard += request.getCountReplicasToCreate(rt);
- }
-
- if (cluster.getLiveNodes().size() < totalReplicasPerShard) {
- throw new PlacementException("Cluster size too small for number of replicas per shard");
- }
-
- // Get number of cores on each Node
- TreeMultimap<Integer, Node> nodesByCores = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary());
-
- Set<Node> nodes = request.getTargetNodes();
-
- attributeFetcher.requestNodeCoreCount();
- attributeFetcher.fetchFrom(nodes);
- AttributeValues attrValues = attributeFetcher.fetchAttributes();
-
-
- // Get the number of cores on each node and sort the nodes by increasing number of cores
- for (Node node : nodes) {
- if (attrValues.getCoresCount(node).isEmpty()) {
- throw new PlacementException("Can't get number of cores in " + node);
- }
- nodesByCores.put(attrValues.getCoresCount(node).get(), node);
- }
-
- Set<ReplicaPlacement> replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
-
- // Now place all replicas of all shards on nodes, by placing on nodes with the smallest number of cores and taking
- // into account replicas placed during this computation. Note that for each shard we must place replicas on different
- // nodes, when moving to the next shard we use the nodes sorted by their updated number of cores (due to replica
- // placements for previous shards).
- for (String shardName : request.getShardNames()) {
- // Assign replicas based on the sort order of the nodesByCores tree multimap to put replicas on nodes with less
- // cores first. We only need totalReplicasPerShard nodes given that's the number of replicas to place.
- // We assign based on the passed nodeEntriesToAssign list so the right nodes get replicas.
- ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign = new ArrayList<>(totalReplicasPerShard);
- Iterator<Map.Entry<Integer, Node>> treeIterator = nodesByCores.entries().iterator();
- for (int i = 0; i < totalReplicasPerShard; i++) {
- nodeEntriesToAssign.add(treeIterator.next());
- }
-
- // Update the number of cores each node will have once the assignments below got executed so the next shard picks the
- // lowest loaded nodes for its replicas.
- for (Map.Entry<Integer, Node> e : nodeEntriesToAssign) {
- int coreCount = e.getKey();
- Node node = e.getValue();
- nodesByCores.remove(coreCount, node);
- nodesByCores.put(coreCount + 1, node);
- }
-
- for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
- placeReplicas(request.getCollection(), nodeEntriesToAssign, placementPlanFactory, replicaPlacements, shardName, request, replicaType);
- }
- }
-
- return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
- }
-
- private void placeReplicas(SolrCollection solrCollection, ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign,
- PlacementPlanFactory placementPlanFactory, Set<ReplicaPlacement> replicaPlacements,
- String shardName, PlacementRequest request, Replica.ReplicaType replicaType) {
- for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
- final Map.Entry<Integer, Node> entry = nodeEntriesToAssign.remove(0);
- final Node node = entry.getValue();
-
- replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType));
- }
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java
deleted file mode 100644
index eecb57f..0000000
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cluster.placement.plugins;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.solr.cluster.Cluster;
-import org.apache.solr.cluster.Node;
-import org.apache.solr.cluster.Replica;
-import org.apache.solr.cluster.SolrCollection;
-import org.apache.solr.cluster.placement.*;
-
-/**
- * Implements random placement for new collection creation while preventing two replicas of same shard from being placed on same node.
- *
- * <p>Warning: not really tested. See {@link SamplePluginAffinityReplicaPlacement} for a more realistic example.</p>
- */
-public class SamplePluginRandomPlacement implements PlacementPlugin {
-
- private final PlacementPluginConfig config;
-
- private SamplePluginRandomPlacement(PlacementPluginConfig config) {
- this.config = config;
- }
-
- static public class Factory implements PlacementPluginFactory {
- @Override
- public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
- return new SamplePluginRandomPlacement(config);
- }
- }
-
- public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
- PlacementPlanFactory placementPlanFactory) throws PlacementException {
- int totalReplicasPerShard = 0;
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- totalReplicasPerShard += request.getCountReplicasToCreate(rt);
- }
-
- if (cluster.getLiveNodes().size() < totalReplicasPerShard) {
- throw new PlacementException("Cluster size too small for number of replicas per shard");
- }
-
- Set<ReplicaPlacement> replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
-
- // Now place randomly all replicas of all shards on available nodes
- for (String shardName : request.getShardNames()) {
- // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes
- ArrayList<Node> nodesToAssign = new ArrayList<>(cluster.getLiveNodes());
- Collections.shuffle(nodesToAssign, new Random());
-
- for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
- placeForReplicaType(request.getCollection(), nodesToAssign, placementPlanFactory, replicaPlacements, shardName, request, rt);
- }
- }
-
- return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
- }
-
- private void placeForReplicaType(SolrCollection solrCollection, ArrayList<Node> nodesToAssign, PlacementPlanFactory placementPlanFactory,
- Set<ReplicaPlacement> replicaPlacements,
- String shardName, PlacementRequest request, Replica.ReplicaType replicaType) {
- for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
- Node node = nodesToAssign.remove(0);
-
- replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType));
- }
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java
index 1595679..eedb41f 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java
@@ -16,6 +16,6 @@
*/
/**
- * Sample plugin implementations.
+ * Sample plugin implementations. The realistic implementation to use is {@link org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory}.
*/
package org.apache.solr.cluster.placement.plugins;
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java
index f3f2816..605dbb6 100644
--- a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java
+++ b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java
@@ -27,7 +27,7 @@ import org.apache.solr.client.solrj.request.beans.ClusterPropInfo;
import org.apache.solr.client.solrj.request.beans.CreateConfigInfo;
import org.apache.solr.client.solrj.request.beans.RateLimiterMeta;
import org.apache.solr.cloud.OverseerConfigSetMessageHandler;
-import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl;
+import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.common.MapWriterMap;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
@@ -250,14 +250,14 @@ public class ClusterAPI {
ClusterProperties clusterProperties = new ClusterProperties(getCoreContainer().getZkController().getZkClient());
// When the json contains { "set-placement-plugin" : null }, the map is empty, not null.
// Very basic sanity check. Real validation will be done when the config is used...
- if (!(placementPluginConfig == null) && !placementPluginConfig.containsKey(PlacementPluginConfigImpl.CONFIG_CLASS)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must contain " + PlacementPluginConfigImpl.CONFIG_CLASS + " attribute (or be null)");
+ if (!(placementPluginConfig == null) && !placementPluginConfig.containsKey(PlacementPluginConfig.FACTORY_CLASS)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must contain " + PlacementPluginConfig.FACTORY_CLASS + " attribute (or be null)");
}
try {
clusterProperties.update(placementPluginConfig == null?
null:
new MapWriterMap(placementPluginConfig),
- PlacementPluginConfigImpl.PLACEMENT_PLUGIN_CONFIG_KEY);
+ PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error in API", e);
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 57bcbef..3b013ca 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -50,7 +50,7 @@ import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
public class ContainerPluginsApi {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String PLUGIN = "plugin";
+ public static final String PLUGIN = ZkStateReader.CONTAINER_PLUGINS;
private final Supplier<SolrZkClient> zkClientSupplier;
private final CoreContainer coreContainer;
public final Read readAPI = new Read();
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/AttributeFetcherForTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/AttributeFetcherForTest.java
new file mode 100644
index 0000000..b1cc2a0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/AttributeFetcherForTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.placement.AttributeFetcher;
+import org.apache.solr.cluster.placement.AttributeValues;
+
+import java.util.Set;
+
+public class AttributeFetcherForTest implements AttributeFetcher {
+
+ private final AttributeValues attributeValues;
+
+ AttributeFetcherForTest(AttributeValues attributeValues) {
+ this.attributeValues = attributeValues;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeCoreCount() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeDiskType() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeFreeDisk() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeTotalDisk() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeHeapUsage() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeSystemLoadAverage() {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeSystemProperty(String name) {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestNodeEnvironmentVariable(String name) {
+ throw new UnsupportedOperationException("Not yet implemented...");
+ }
+
+ @Override
+ public AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry) {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher fetchFrom(Set<Node> nodes) {
+ return this;
+ }
+
+ @Override
+ public AttributeFetcher requestMetric(String scope, String metricName) {
+ throw new UnsupportedOperationException("Not yet implemented...");
+ }
+
+ @Override
+ public AttributeValues fetchAttributes() {
+ return attributeValues;
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/Builders.java b/solr/core/src/test/org/apache/solr/cluster/placement/Builders.java
new file mode 100644
index 0000000..398b16d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/Builders.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import org.apache.solr.cluster.*;
+import org.apache.solr.cluster.placement.impl.AttributeFetcherImpl;
+import org.apache.solr.cluster.placement.impl.AttributeValuesImpl;
+import org.apache.solr.common.util.Pair;
+import org.junit.Assert;
+
+import java.util.*;
+
+/**
+ * Builder classes to make tests using different cluster and node configurations easier to write and to read.
+ */
+public class Builders {
+
+ public static ClusterBuilder newClusterBuilder() {
+ return new ClusterBuilder();
+ }
+
+ public static CollectionBuilder newCollectionBuilder(String collectionName) {
+ return new CollectionBuilder(collectionName);
+ }
+
+ public static class ClusterBuilder {
+ /**
+ * {@link NodeBuilder} for the live nodes of the cluster.
+ */
+ private LinkedList<NodeBuilder> nodeBuilders = new LinkedList<>();
+ private LinkedList<CollectionBuilder> collectionBuilders = new LinkedList<>();
+
+ public ClusterBuilder initializeLiveNodes(int countNodes) {
+ nodeBuilders = new LinkedList<>();
+ for (int n = 0; n < countNodes; n++) {
+ nodeBuilders.add(new NodeBuilder().setNodeName("node_" + n)); // Default name, can be changed
+ }
+ return this;
+ }
+
+ public LinkedList<NodeBuilder> getLiveNodeBuilders() {
+ return nodeBuilders;
+ }
+
+ public ClusterBuilder addCollection(CollectionBuilder collectionBuilder) {
+ collectionBuilders.add(collectionBuilder);
+ return this;
+ }
+
+ public Cluster build() {
+ // TODO if converting all tests to use builders change ClusterImpl ctor to use list of nodes
+ return new ClusterAbstractionsForTest.ClusterImpl(new HashSet<>(buildLiveNodes()), buildClusterCollections());
+ }
+
+ public List<Node> buildLiveNodes() {
+ List<Node> liveNodes = new LinkedList<>();
+ for (NodeBuilder nodeBuilder : nodeBuilders) {
+ liveNodes.add(nodeBuilder.build());
+ }
+
+ return liveNodes;
+ }
+
+ Map<String, SolrCollection> buildClusterCollections() {
+ Map<String, SolrCollection> clusterCollections = new LinkedHashMap<>();
+ for (CollectionBuilder collectionBuilder : collectionBuilders) {
+ SolrCollection solrCollection = collectionBuilder.build();
+ clusterCollections.put(solrCollection.getName(), solrCollection);
+ }
+
+ return clusterCollections;
+ }
+
+ public AttributeFetcher buildAttributeFetcher() {
+ Map<Node, Integer> nodeToCoreCount = new HashMap<>();
+ Map<Node, Long> nodeToFreeDisk = new HashMap<>();
+ Map<String, Map<Node, String>> sysprops = new HashMap<>();
+ Map<String, Map<Node, Double>> metrics = new HashMap<>();
+
+ // TODO And a few more missing and will be added...
+
+ // Slight redoing of work twice (building Node instances) but let's favor readability over tricks (I could think
+ // of many) to reuse the nodes computed in build() or build the AttributeFetcher at the same time.
+ for (NodeBuilder nodeBuilder : nodeBuilders) {
+ Node node = nodeBuilder.build();
+
+ if (nodeBuilder.getCoreCount() != null) {
+ nodeToCoreCount.put(node, nodeBuilder.getCoreCount());
+ }
+ if (nodeBuilder.getFreeDiskGB() != null) {
+ nodeToFreeDisk.put(node, nodeBuilder.getFreeDiskGB());
+ }
+ if (nodeBuilder.getSysprops() != null) {
+ nodeBuilder.getSysprops().forEach((name, value) -> {
+ sysprops.computeIfAbsent(name, n -> new HashMap<>())
+ .put(node, value);
+ });
+ }
+ if (nodeBuilder.getMetrics() != null) {
+ nodeBuilder.getMetrics().forEach((name, value) -> {
+ metrics.computeIfAbsent(name, n -> new HashMap<>())
+ .put(node, value);
+ });
+ }
+ }
+
+ AttributeValues attributeValues = new AttributeValuesImpl(nodeToCoreCount, Map.of(), nodeToFreeDisk, Map.of(), Map.of(), Map.of(), sysprops, metrics);
+ return new AttributeFetcherForTest(attributeValues);
+ }
+ }
+
+ public static class CollectionBuilder {
+ private final String collectionName;
+ private LinkedList<ShardBuilder> shardBuilders = new LinkedList<>();
+ private Map<String, String> customProperties = new HashMap<>();
+ int replicaNumber = 0; // global replica numbering for the collection
+
+ public CollectionBuilder(String collectionName) {
+ this.collectionName = collectionName;
+ }
+
+ public CollectionBuilder addCustomProperty(String name, String value) {
+ customProperties.put(name, value);
+ return this;
+ }
+
+ /**
+ * @return The internal shards data structure to allow test code to modify the replica distribution to nodes.
+ */
+ public LinkedList<ShardBuilder> getShardBuilders() {
+ return shardBuilders;
+ }
+
+ /**
+ * Initializes the collection to a specific shard and replica distribution passed in {@code shardsReplicas}.
+ * @param shardsReplicas A list of shard descriptions, describing the replicas of that shard.
+ * Replica description include the replica type and the node on which the replica should be placed.
+ * Everything is text to make it easy to design specific collections. For example the following value:
+ * <pre>{@code
+ * List.of(
+ * List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ * List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ * }</pre>
+ * Creates a placement that would distribute replicas to nodes (there must be at least 4 nodes)
+ * in the following way:
+ * <pre>{@code
+ * +--------------+----+----+----+----+
+ * | Node | 0 | 1 | 2 | 3 |
+ * +----------------------------------+
+ * | Shard 1: | | | | |
+ * | NRT | X | | | X |
+ * | TLOG | X | | | |
+ * +----------------------------------+
+ * | Shard 2: | | | | |
+ * | NRT | | X | | X |
+ * | TLOG | | | X | |
+ * +--------------+----+----+----+----+
+ * }</pre>
+ */
+ public CollectionBuilder customCollectionSetup(List<List<String>> shardsReplicas, List<NodeBuilder> liveNodes) {
+ shardBuilders = new LinkedList<>();
+ int shardNumber = 1; // Shard numbering starts at 1
+ for (List<String> replicasOnNodes : shardsReplicas) {
+ String shardName = buildShardName(shardNumber++);
+ LinkedList<ReplicaBuilder> replicas = new LinkedList<>();
+ ReplicaBuilder leader = null;
+
+ for (String replicaNode : replicasOnNodes) {
+ // replicaNode is like "TLOG 2" meaning a TLOG replica should be placed on node 2
+ String[] splited = replicaNode.split("\\s+");
+ Assert.assertEquals(2, splited.length);
+ Replica.ReplicaType type = Replica.ReplicaType.valueOf(splited[0]);
+ final NodeBuilder node;
+ int nodeIndex = Integer.parseInt(splited[1]);
+ if (nodeIndex < liveNodes.size()) {
+ node = liveNodes.get(nodeIndex);
+ } else {
+ // The collection can have replicas on non live nodes. Let's create such a node here (that is not known to the
+ // cluster). There could be many non live nodes in the collection configuration, they will all reference new
+ // instances such as below of a node unknown to cluster, but all will have the same name (so will be equal if
+ // tested).
+ node = new NodeBuilder().setNodeName("NonLiveNode");
+ }
+ String replicaName = buildReplicaName(shardName, type);
+
+ ReplicaBuilder replicaBuilder = new ReplicaBuilder();
+ replicaBuilder.setReplicaName(replicaName).setCoreName(buildCoreName(replicaName)).setReplicaType(type)
+ .setReplicaState(Replica.ReplicaState.ACTIVE).setReplicaNode(node);
+ replicas.add(replicaBuilder);
+
+ // No way to specify which replica is the leader. Could be done by adding a "*" to the replica definition for example
+ // in the passed shardsReplicas but not implementing this until it is needed :)
+ if (leader == null && type != Replica.ReplicaType.PULL) {
+ leader = replicaBuilder;
+ }
+ }
+
+ ShardBuilder shardBuilder = new ShardBuilder();
+ shardBuilder.setShardName(shardName).setReplicaBuilders(replicas).setLeader(leader);
+ shardBuilders.add(shardBuilder);
+ }
+
+ return this;
+ }
+
+ /**
+ * Initializes shard and replica builders for the collection based on passed parameters. Replicas are assigned round
+ * robin to the nodes. The shard leader is the first NRT replica of each shard (or first TLOG is no NRT).
+ * Shard and replica configuration can be modified afterwards, the returned builder hierarchy is a convenient starting point.
+ */
+ public CollectionBuilder initializeShardsReplicas(int countShards, int countNrtReplicas, int countTlogReplicas,
+ int countPullReplicas, List<NodeBuilder> nodes) {
+ Iterator<NodeBuilder> nodeIterator = nodes.iterator();
+
+ shardBuilders = new LinkedList<>();
+
+ for (int shardNumber = 1; shardNumber <= countShards; shardNumber++) {
+ String shardName = buildShardName(shardNumber);
+
+ LinkedList<ReplicaBuilder> replicas = new LinkedList<>();
+ ReplicaBuilder leader = null;
+
+ // Iterate on requested counts, NRT then TLOG then PULL. Leader chosen as first NRT (or first TLOG if no NRT)
+ List<Pair<Replica.ReplicaType, Integer>> replicaTypes = List.of(
+ new Pair<>(Replica.ReplicaType.NRT, countNrtReplicas),
+ new Pair<>(Replica.ReplicaType.TLOG, countTlogReplicas),
+ new Pair<>(Replica.ReplicaType.PULL, countPullReplicas));
+
+ for (Pair<Replica.ReplicaType, Integer> tc : replicaTypes) {
+ Replica.ReplicaType type = tc.first();
+ int count = tc.second();
+ for (int r = 0; r < count; r++) {
+ if (!nodeIterator.hasNext()) {
+ nodeIterator = nodes.iterator();
+ }
+ // If the nodes set is empty, this call will fail
+ final NodeBuilder node = nodeIterator.next();
+
+ String replicaName = buildReplicaName(shardName, type);
+
+ ReplicaBuilder replicaBuilder = new ReplicaBuilder();
+ replicaBuilder.setReplicaName(replicaName).setCoreName(buildCoreName(replicaName)).setReplicaType(type)
+ .setReplicaState(Replica.ReplicaState.ACTIVE).setReplicaNode(node);
+ replicas.add(replicaBuilder);
+
+ if (leader == null && type != Replica.ReplicaType.PULL) {
+ leader = replicaBuilder;
+ }
+ }
+ }
+
+ ShardBuilder shardBuilder = new ShardBuilder();
+ shardBuilder.setShardName(shardName).setReplicaBuilders(replicas).setLeader(leader);
+ shardBuilders.add(shardBuilder);
+ }
+
+ return this;
+ }
+
+ private String buildShardName(int shardIndex) {
+ return "shard" + shardIndex;
+ }
+
+ private String buildReplicaName(String shardName, Replica.ReplicaType replicaType) {
+ return collectionName + "_" + shardName + "_replica_" + replicaType.getSuffixChar() + replicaNumber++;
+ }
+
+ private String buildCoreName(String replicaName) {
+ return replicaName + "_c";
+ }
+
+ public SolrCollection build() {
+ ClusterAbstractionsForTest.SolrCollectionImpl solrCollection = new ClusterAbstractionsForTest.SolrCollectionImpl(collectionName, customProperties);
+
+ final LinkedHashMap<String, Shard> shards = new LinkedHashMap<>();
+
+ for (ShardBuilder shardBuilder : shardBuilders) {
+ Shard shard = shardBuilder.build(solrCollection);
+ shards.put(shard.getShardName(), shard);
+ }
+
+ solrCollection.setShards(shards);
+ return solrCollection;
+ }
+ }
+
+ public static class ShardBuilder {
+ private String shardName;
+ private LinkedList<ReplicaBuilder> replicaBuilders = new LinkedList<>();
+ private ReplicaBuilder leaderReplicaBuilder;
+
+ public ShardBuilder setShardName(String shardName) {
+ this.shardName = shardName;
+ return this;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public LinkedList<ReplicaBuilder> getReplicaBuilders() {
+ return replicaBuilders;
+ }
+
+ public ShardBuilder setReplicaBuilders(LinkedList<ReplicaBuilder> replicaBuilders) {
+ this.replicaBuilders = replicaBuilders;
+ return this;
+ }
+
+ public ShardBuilder setLeader(ReplicaBuilder leaderReplicaBuilder) {
+ this.leaderReplicaBuilder = leaderReplicaBuilder;
+ return this;
+ }
+
+ public Shard build(SolrCollection collection) {
+ ClusterAbstractionsForTest.ShardImpl shard = new ClusterAbstractionsForTest.ShardImpl(shardName, collection, Shard.ShardState.ACTIVE);
+
+ final LinkedHashMap<String, Replica> replicas = new LinkedHashMap<>();
+ Replica leader = null;
+
+ for (ReplicaBuilder replicaBuilder : replicaBuilders) {
+ Replica replica = replicaBuilder.build(shard);
+ replicas.put(replica.getReplicaName(), replica);
+
+ if (leaderReplicaBuilder == replicaBuilder) {
+ leader = replica;
+ }
+ }
+
+ shard.setReplicas(replicas, leader);
+ return shard;
+ }
+ }
+
+ public static class ReplicaBuilder {
+ private String replicaName;
+ private String coreName;
+ private Replica.ReplicaType replicaType;
+ private Replica.ReplicaState replicaState;
+ private NodeBuilder replicaNode;
+
+ public ReplicaBuilder setReplicaName(String replicaName) {
+ this.replicaName = replicaName;
+ return this;
+ }
+
+ public ReplicaBuilder setCoreName(String coreName) {
+ this.coreName = coreName;
+ return this;
+ }
+
+ public Replica.ReplicaType getReplicaType() {
+ return replicaType;
+ }
+
+ public ReplicaBuilder setReplicaType(Replica.ReplicaType replicaType) {
+ this.replicaType = replicaType;
+ return this;
+ }
+
+ public ReplicaBuilder setReplicaState(Replica.ReplicaState replicaState) {
+ this.replicaState = replicaState;
+ return this;
+ }
+
+ public ReplicaBuilder setReplicaNode(NodeBuilder replicaNode) {
+ this.replicaNode = replicaNode;
+ return this;
+ }
+
+ public Replica build(Shard shard) {
+ return new ClusterAbstractionsForTest.ReplicaImpl(replicaName, coreName, shard, replicaType, replicaState, replicaNode.build());
+ }
+ }
+
+ public static class NodeBuilder {
+ private String nodeName = null;
+ private Integer coreCount = null;
+ private Long freeDiskGB = null;
+ private Map<String, String> sysprops = null;
+ private Map<String, Double> metrics = null;
+
+ public NodeBuilder setNodeName(String nodeName) {
+ this.nodeName = nodeName;
+ return this;
+ }
+
+ public NodeBuilder setCoreCount(Integer coreCount) {
+ this.coreCount = coreCount;
+ return this;
+ }
+
+ public NodeBuilder setFreeDiskGB(Long freeDiskGB) {
+ this.freeDiskGB = freeDiskGB;
+ return this;
+ }
+
+ public NodeBuilder setSysprop(String key, String value) {
+ if (sysprops == null) {
+ sysprops = new HashMap<>();
+ }
+ String name = AttributeFetcherImpl.getSystemPropertySnitchTag(key);
+ sysprops.put(name, value);
+ return this;
+ }
+
+ public NodeBuilder setMetric(AttributeFetcher.NodeMetricRegistry registry, String key, Double value) {
+ if (metrics == null) {
+ metrics = new HashMap<>();
+ }
+ String name = AttributeFetcherImpl.getMetricSnitchTag(key, registry);
+ metrics.put(name, value);
+ return this;
+ }
+
+ public Integer getCoreCount() {
+ return coreCount;
+ }
+
+ public Long getFreeDiskGB() {
+ return freeDiskGB;
+ }
+
+ public Map<String, String> getSysprops() {
+ return sysprops;
+ }
+
+ public Map<String, Double> getMetrics() {
+ return metrics;
+ }
+
+ public Node build() {
+ // It is ok to build a new instance each time, that instance does the right thing with equals() and hashCode()
+ return new ClusterAbstractionsForTest.NodeImpl(nodeName);
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java
new file mode 100644
index 0000000..771f148
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement;
+
+import org.apache.solr.cluster.*;
+
+import javax.annotation.Nonnull;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Cluster abstractions independent of any internal SolrCloud abstractions to use in tests (of plugin code).
+ */
+class ClusterAbstractionsForTest {
+
+ static class ClusterImpl implements Cluster {
+ private final Set<Node> liveNodes = new HashSet<>();
+ private final Map<String, SolrCollection> collections = new HashMap<>();
+
+ ClusterImpl(Set<Node> liveNodes, Map<String, SolrCollection> collections) {
+ this.liveNodes.addAll(liveNodes);
+ this.collections.putAll(collections);
+ }
+
+ @Override
+ public Set<Node> getLiveNodes() {
+ return liveNodes;
+ }
+
+ @Override
+ public SolrCollection getCollection(String collectionName) {
+ return collections.get(collectionName);
+ }
+
+ @Override
+ @Nonnull
+ public Iterator<SolrCollection> iterator() {
+ return collections.values().iterator();
+ }
+
+ @Override
+ public Iterable<SolrCollection> collections() {
+ return ClusterImpl.this::iterator;
+ }
+ }
+
+
+ static class NodeImpl implements Node {
+ public final String nodeName;
+
+ /**
+ * Transforms a collection of node names into a set of {@link Node} instances.
+ */
+ static Set<Node> getNodes(Collection<String> nodeNames) {
+ return nodeNames.stream().map(NodeImpl::new).collect(Collectors.toSet());
+ }
+
+ NodeImpl(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getName() {
+ return nodeName;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" + getName() + ")";
+ }
+
+ /**
+ * This class ends up as a key in Maps in {@link org.apache.solr.cluster.placement.AttributeValues}.
+ * It is important to implement this method comparing node names given that new instances of {@link Node} are created
+ * with names equal to existing instances (See {@link Builders.NodeBuilder#build()}).
+ */
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ NodeImpl other = (NodeImpl) obj;
+ return Objects.equals(this.nodeName, other.nodeName);
+ }
+
+ public int hashCode() {
+ return Objects.hashCode(nodeName);
+ }
+ }
+
+
+ static class SolrCollectionImpl implements SolrCollection {
+ private final String collectionName;
+ /**
+ * Map from {@link Shard#getShardName()} to {@link Shard}
+ */
+ private Map<String, Shard> shards;
+ private final Map<String, String> customProperties;
+
+ SolrCollectionImpl(String collectionName, Map<String, String> customProperties) {
+ this.collectionName = collectionName;
+ this.customProperties = customProperties;
+ }
+
+ /**
+ * Setting the shards has to happen (in tests) after creating the collection because shards reference the collection
+ */
+ void setShards(Map<String, Shard> shards) {
+ this.shards = shards;
+ }
+
+ @Override
+ public String getName() {
+ return collectionName;
+ }
+
+ @Override
+ public Shard getShard(String name) {
+ return shards.get(name);
+ }
+
+ @Override
+ @Nonnull
+ public Iterator<Shard> iterator() {
+ return shards.values().iterator();
+ }
+
+ @Override
+ public Iterable<Shard> shards() {
+ return SolrCollectionImpl.this::iterator;
+ }
+
+ @Override
+ public Set<String> getShardNames() {
+ return shards.keySet();
+ }
+
+ @Override
+ public String getCustomProperty(String customPropertyName) {
+ return customProperties.get(customPropertyName);
+ }
+ }
+
+
+ static class ShardImpl implements Shard {
+ private final String shardName;
+ private final SolrCollection collection;
+ private final ShardState shardState;
+ private Map<String, Replica> replicas;
+ private Replica leader;
+
+ ShardImpl(String shardName, SolrCollection collection, ShardState shardState) {
+ this.shardName = shardName;
+ this.collection = collection;
+ this.shardState = shardState;
+ }
+
+ /**
+ * Setting the replicas has to happen (in tests) after creating the shard because replicas reference the shard
+ */
+ void setReplicas(Map<String, Replica> replicas, Replica leader) {
+ this.replicas = replicas;
+ this.leader = leader;
+ }
+
+ @Override
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public SolrCollection getCollection() {
+ return collection;
+ }
+
+ @Override
+ public Replica getReplica(String name) {
+ return replicas.get(name);
+ }
+
+ @Override
+ @Nonnull
+ public Iterator<Replica> iterator() {
+ return replicas.values().iterator();
+ }
+
+ @Override
+ public Iterable<Replica> replicas() {
+ return ShardImpl.this::iterator;
+ }
+
+ @Override
+ public Replica getLeader() {
+ return leader;
+ }
+
+ @Override
+ public ShardState getState() {
+ return shardState;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ ShardImpl other = (ShardImpl) obj;
+ return Objects.equals(this.shardName, other.shardName)
+ && Objects.equals(this.collection, other.collection)
+ && Objects.equals(this.shardState, other.shardState)
+ && Objects.equals(this.replicas, other.replicas)
+ && Objects.equals(this.leader, other.leader);
+ }
+
+ public int hashCode() {
+ return Objects.hash(shardName, collection, shardState);
+ }
+ }
+
+
+ static class ReplicaImpl implements Replica {
+ private final String replicaName;
+ private final String coreName;
+ private final Shard shard;
+ private final ReplicaType replicaType;
+ private final ReplicaState replicaState;
+ private final Node node;
+
+ ReplicaImpl(String replicaName, String coreName, Shard shard, ReplicaType replicaType, ReplicaState replicaState, Node node) {
+ this.replicaName = replicaName;
+ this.coreName = coreName;
+ this.shard = shard;
+ this.replicaType = replicaType;
+ this.replicaState = replicaState;
+ this.node = node;
+ }
+
+ @Override
+ public Shard getShard() {
+ return shard;
+ }
+
+ @Override
+ public ReplicaType getType() {
+ return replicaType;
+ }
+
+ @Override
+ public ReplicaState getState() {
+ return replicaState;
+ }
+
+ @Override
+ public String getReplicaName() {
+ return replicaName;
+ }
+
+ @Override
+ public String getCoreName() {
+ return coreName;
+ }
+
+ @Override
+ public Node getNode() {
+ return node;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ ReplicaImpl other = (ReplicaImpl) obj;
+ return Objects.equals(this.replicaName, other.replicaName)
+ && Objects.equals(this.coreName, other.coreName)
+ && Objects.equals(this.shard, other.shard)
+ && Objects.equals(this.replicaType, other.replicaType)
+ && Objects.equals(this.replicaState, other.replicaState)
+ && Objects.equals(this.node, other.node);
+ }
+
+ public int hashCode() {
+ return Objects.hash(replicaName, coreName, shard, replicaType, replicaState, node);
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
new file mode 100644
index 0000000..ac325b1
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.placement.PlacementPluginConfig;
+import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
+import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.singletonMap;
+
+/**
+ * Test for {@link MinimizeCoresPlacementFactory} using a {@link MiniSolrCloudCluster}.
+ */
+public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
+
+ private static final String COLLECTION = PlacementPluginIntegrationTest.class.getName() + "_collection";
+
+ private static ClusterProperties clusterProperties;
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // placement plugins need metrics
+ System.setProperty("metricsEnabled", "true");
+ configureCluster(3)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ clusterProperties = new ClusterProperties(cluster.getZkClient());
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ cluster.deleteAllCollections();
+ V2Request req = new V2Request.Builder("/cluster")
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("set-placement-plugin", Map.of()))
+ .build();
+ req.process(cluster.getSolrClient());
+
+ }
+
+ @Test
+ public void testMinimizeCores() throws Exception {
+ Map<String, Object> config = Map.of(PlacementPluginConfig.FACTORY_CLASS, MinimizeCoresPlacementFactory.class.getName());
+ V2Request req = new V2Request.Builder("/cluster")
+ .forceV2(true)
+ .POST()
+ .withPayload(singletonMap("set-placement-plugin", config))
+ .build();
+ req.process(cluster.getSolrClient());
+
+ CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
+ .process(cluster.getSolrClient());
+ assertTrue(rsp.isSuccess());
+ cluster.waitForActiveCollection(COLLECTION, 2, 4);
+ // use Solr-specific API to verify the expected placements
+ ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
+ DocCollection collection = clusterState.getCollectionOrNull(COLLECTION);
+ assertNotNull(collection);
+ Map<String, AtomicInteger> coresByNode = new HashMap<>();
+ collection.forEachReplica((shard, replica) -> {
+ coresByNode.computeIfAbsent(replica.getNodeName(), n -> new AtomicInteger()).incrementAndGet();
+ });
+ int maxCores = 0;
+ int minCores = Integer.MAX_VALUE;
+ for (Map.Entry<String, AtomicInteger> entry : coresByNode.entrySet()) {
+ assertTrue("too few cores on node " + entry.getKey() + ": " + entry.getValue(),
+ entry.getValue().get() > 0);
+ if (entry.getValue().get() > maxCores) {
+ maxCores = entry.getValue().get();
+ }
+ if (entry.getValue().get() < minCores) {
+ minCores = entry.getValue().get();
+ }
+ }
+ assertEquals("max cores too high", 2, maxCores);
+ assertEquals("min cores too low", 1, minCores);
+ }
+
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsTest.java
new file mode 100644
index 0000000..5e58779
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.impl;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Locale;
+import java.util.Set;
+
+/**
+ *
+ */
+public class SimpleClusterAbstractionsTest extends SolrCloudTestCase {
+
+ private static final String COLLECTION = SimpleClusterAbstractionsTest.class.getName() + "_collection";
+
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
+ .process(cluster.getSolrClient());
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
+ Cluster cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(cloudManager);
+ assertNotNull(cluster);
+ Set<Node> nodes = cluster.getLiveNodes();
+ nodes.forEach(n -> assertTrue("missing node " + n, clusterState.liveNodesContain(n.getName())));
+
+ DocCollection docCollection = clusterState.getCollection(COLLECTION);
+ SolrCollection collection = cluster.getCollection(COLLECTION);
+ // XXX gah ... can't assert anything about collection properties !!!??
+ // things like router or other collection props, like eg. special placement policy
+
+ assertNotNull(collection);
+ for (String shardName : docCollection.getSlicesMap().keySet()) {
+ Slice slice = docCollection.getSlice(shardName);
+ Shard shard = collection.getShard(shardName);
+ // XXX can't assert shard range ... because it's not in the API! :(
+
+ assertNotNull("missing shard " + shardName, shard);
+ assertNotNull("no leader in shard " + shard, shard.getLeader());
+ Replica replica = shard.getLeader();
+ assertEquals(slice.getLeader().getName(), replica.getReplicaName());
+ slice.getReplicas().forEach(sreplica -> {
+ Replica r = shard.getReplica(sreplica.getName());
+ assertNotNull("missing replica " + sreplica.getName(), r);
+ assertEquals(r.getCoreName(), sreplica.getCoreName());
+ assertEquals(r.getNode().getName(), sreplica.getNodeName());
+ assertEquals(r.getState().toString().toLowerCase(Locale.ROOT), sreplica.getState().toString());
+ assertEquals(r.getType().toString(), sreplica.getType().toString());
+ });
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
new file mode 100644
index 0000000..7e240b6
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cluster.Cluster;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.*;
+import org.apache.solr.cluster.placement.Builders;
+import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl;
+import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl;
+import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
+import org.apache.solr.common.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Unit test for {@link AffinityPlacementFactory}
+ */
+public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static PlacementPlugin plugin;
+
+ private final static long MINIMAL_FREE_DISK_GB = 10L;
+ private final static long PRIORITIZED_FREE_DISK_GB = 50L;
+
+ @BeforeClass
+ public static void setupPlugin() {
+ PlacementPluginConfig config = PlacementPluginConfigImpl.createConfigFromProperties(
+ Map.of("minimalFreeDiskGB", MINIMAL_FREE_DISK_GB, "prioritizedFreeDiskGB", PRIORITIZED_FREE_DISK_GB));
+ plugin = new AffinityPlacementFactory().createPluginInstance(config);
+ }
+
+ @Test
+ public void testBasicPlacementNewCollection() throws Exception {
+ testBasicPlacementInternal(false);
+ }
+
+ @Test
+ public void testBasicPlacementExistingCollection() throws Exception {
+ testBasicPlacementInternal(true);
+ }
+
+ /**
+ * When this test places a replica for a new collection, it should pick the node with less cores.<p>
+ * <p>
+ * When it places a replica for an existing collection, it should pick the node with less cores that doesn't already have a replica for the shard.
+ */
+ private void testBasicPlacementInternal(boolean hasExistingCollection) throws Exception {
+ String collectionName = "basicCollection";
+
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(2);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ nodeBuilders.get(0).setCoreCount(1).setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+ nodeBuilders.get(1).setCoreCount(10).setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+
+ if (hasExistingCollection) {
+ // Existing collection has replicas for its shards and is visible in the cluster state
+ collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
+ clusterBuilder.addCollection(collectionBuilder);
+ } else {
+ // New collection to create has the shards defined but no replicas and is not present in cluster state
+ collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
+ }
+
+ Cluster cluster = clusterBuilder.build();
+ AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
+
+ SolrCollection solrCollection = collectionBuilder.build();
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place a new replica for the (only) existing shard of the collection
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
+ Set.of(solrCollection.shards().iterator().next().getShardName()), new HashSet<>(liveNodes),
+ 1, 0, 0);
+
+ PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, new PlacementPlanFactoryImpl());
+
+ assertEquals(1, pp.getReplicaPlacements().size());
+ ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
+ assertEquals(hasExistingCollection ? liveNodes.get(1) : liveNodes.get(0), rp.getNode());
+ }
+
+ /**
+ * Test not placing replicas on nodes low free disk unless no other option
+ */
+ @Test
+ public void testLowSpaceNode() throws Exception {
+ String collectionName = "lowSpaceCollection";
+
+ final int LOW_SPACE_NODE_INDEX = 0;
+ final int NO_SPACE_NODE_INDEX = 1;
+
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(8);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ for (int i = 0; i < nodeBuilders.size(); i++) {
+ if (i == LOW_SPACE_NODE_INDEX) {
+ nodeBuilders.get(i).setCoreCount(1).setFreeDiskGB(MINIMAL_FREE_DISK_GB + 1); // Low space
+ } else if (i == NO_SPACE_NODE_INDEX) {
+ nodeBuilders.get(i).setCoreCount(10).setFreeDiskGB(1L); // Really not enough space
+ } else {
+ nodeBuilders.get(i).setCoreCount(10).setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+ }
+ }
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // The collection to create (shards are defined but no replicas)
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(3, 0, 0, 0, List.of());
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ // Place two replicas of each type for each shard
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 2, 2, 2);
+
+ PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+
+ assertEquals(18, pp.getReplicaPlacements().size()); // 3 shards, 6 replicas total each
+ Set<Pair<String, Node>> placements = new HashSet<>();
+ for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
+ assertTrue("two replicas for same shard placed on same node", placements.add(new Pair<>(rp.getShardName(), rp.getNode())));
+ assertNotEquals("Replica unnecessarily placed on node with low free space", rp.getNode(), liveNodes.get(LOW_SPACE_NODE_INDEX));
+ assertNotEquals("Replica placed on node with not enough free space", rp.getNode(), liveNodes.get(NO_SPACE_NODE_INDEX));
+ }
+
+ // Verify that if we ask for 7 replicas, the placement will use the low free space node
+ placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 7, 0, 0);
+ pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ assertEquals(21, pp.getReplicaPlacements().size()); // 3 shards, 7 replicas each
+ placements = new HashSet<>();
+ for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
+ assertEquals("Only NRT replicas should be created", Replica.ReplicaType.NRT, rp.getReplicaType());
+ assertTrue("two replicas for same shard placed on same node", placements.add(new Pair<>(rp.getShardName(), rp.getNode())));
+ assertNotEquals("Replica placed on node with not enough free space", rp.getNode(), liveNodes.get(NO_SPACE_NODE_INDEX));
+ }
+
+ // Verify that if we ask for 8 replicas, the placement fails
+ try {
+ placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 8, 0, 0);
+ plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ fail("Placing 8 replicas should not be possible given only 7 nodes have enough space");
+ } catch (PlacementException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests that existing collection replicas are taken into account when preventing more than one replica per shard to be
+ * placed on any node.
+ */
+ @Test
+ public void testPlacementWithExistingReplicas() throws Exception {
+ String collectionName = "existingCollection";
+
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coresOnNode = 10;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coresOnNode).setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+ coresOnNode += 10;
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ // Note that the collection as defined below is in a state that would NOT be returned by the placement plugin:
+ // shard 1 has two replicas on node 0.
+ // The plugin should still be able to place additional replicas as long as they don't break the rules.
+ List<List<String>> shardsReplicas = List.of(
+ List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
+ List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place an additional NRT and an additional TLOG replica for each shard
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 1, 1, 0);
+
+ // The replicas must be placed on the most appropriate nodes, i.e. those that do not already have a replica for the
+ // shard and then on the node with the lowest number of cores.
+ // NRT are placed first and given the cluster state here the placement is deterministic (easier to test, only one good placement).
+ PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+
+ // Each expected placement is represented as a string "shard replica-type node"
+ Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+
+ /**
+ * Tests placement with multiple criteria: Replica type restricted nodes, Availability zones + existing collection
+ */
+ @Test
+ public void testPlacementMultiCriteria() throws Exception {
+ String collectionName = "multiCollection";
+
+ // Note node numbering is in purpose not following AZ structure
+ final int AZ1_NRT_LOWCORES = 0;
+ final int AZ1_NRT_HIGHCORES = 3;
+ final int AZ1_TLOGPULL_LOWFREEDISK = 5;
+
+ final int AZ2_NRT_MEDCORES = 2;
+ final int AZ2_NRT_HIGHCORES = 1;
+ final int AZ2_TLOGPULL = 7;
+
+ final int AZ3_NRT_LOWCORES = 4;
+ final int AZ3_NRT_HIGHCORES = 6;
+ final int AZ3_TLOGPULL = 8;
+
+ final String AZ1 = "AZ1";
+ final String AZ2 = "AZ2";
+ final String AZ3 = "AZ3";
+
+ final int LOW_CORES = 10;
+ final int MED_CORES = 50;
+ final int HIGH_CORES = 100;
+
+ final String TLOG_PULL_REPLICA_TYPE = "TLOG, PULL";
+ final String NRT_REPLICA_TYPE = "Nrt";
+
+ // Cluster nodes and their attributes.
+ // 3 AZ's with three nodes each, 2 of which can only take NRT, one that can take TLOG or PULL
+ // One of the NRT has less cores than the other
+ // The TLOG/PULL replica on AZ1 doesn't have much free disk space
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ for (int i = 0; i < 9; i++) {
+ final String az;
+ final int numcores;
+ final long freedisk;
+ final String acceptedReplicaType;
+
+ if (i == AZ1_NRT_LOWCORES || i == AZ1_NRT_HIGHCORES || i == AZ1_TLOGPULL_LOWFREEDISK) {
+ az = AZ1;
+ } else if (i == AZ2_NRT_HIGHCORES || i == AZ2_NRT_MEDCORES || i == AZ2_TLOGPULL) {
+ az = AZ2;
+ } else {
+ az = AZ3;
+ }
+
+ if (i == AZ1_NRT_LOWCORES || i == AZ3_NRT_LOWCORES) {
+ numcores = LOW_CORES;
+ } else if (i == AZ2_NRT_MEDCORES) {
+ numcores = MED_CORES;
+ } else {
+ numcores = HIGH_CORES;
+ }
+
+ if (i == AZ1_TLOGPULL_LOWFREEDISK) {
+ freedisk = PRIORITIZED_FREE_DISK_GB - 10;
+ } else {
+ freedisk = PRIORITIZED_FREE_DISK_GB + 10;
+ }
+
+ if (i == AZ1_TLOGPULL_LOWFREEDISK || i == AZ2_TLOGPULL || i == AZ3_TLOGPULL) {
+ acceptedReplicaType = TLOG_PULL_REPLICA_TYPE;
+ } else {
+ acceptedReplicaType = NRT_REPLICA_TYPE;
+ }
+
+ nodeBuilders.get(i).setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, az)
+ .setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, acceptedReplicaType)
+ .setCoreCount(numcores)
+ .setFreeDiskGB(freedisk);
+ }
+
+ // The collection already exists with shards and replicas.
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ List<List<String>> shardsReplicas = List.of(
+ List.of("NRT " + AZ1_NRT_HIGHCORES, "TLOG " + AZ3_TLOGPULL), // shard 1
+ List.of("TLOG " + AZ2_TLOGPULL)); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Add 2 NRT and one TLOG to each shard.
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 2, 1, 0);
+ PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ // Shard 1: The NRT's should go to the med cores node on AZ2 and low core on az3 (even though
+ // a low core node can take the replica in az1, there's already an NRT replica there and we want spreading across AZ's),
+ // the TLOG to the TLOG node on AZ2 (because the tlog node on AZ1 has low free disk)
+ // Shard 2: The NRT's should go to AZ1 and AZ3 lowcores because AZ2 has more cores (and there's not NRT in any AZ for
+ // this shard). The TLOG should go to AZ3 because AZ1 TLOG node has low free disk.
+ // Each expected placement is represented as a string "shard replica-type node"
+ Set<String> expectedPlacements = Set.of("1 NRT " + AZ2_NRT_MEDCORES, "1 NRT " + AZ3_NRT_LOWCORES, "1 TLOG " + AZ2_TLOGPULL,
+ "2 NRT " + AZ1_NRT_LOWCORES, "2 NRT " + AZ3_NRT_LOWCORES, "2 TLOG " + AZ3_TLOGPULL);
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+
+ // If we add instead 2 PULL replicas to each shard
+ placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ 0, 0, 2);
+ pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ // Shard 1: Given node AZ3_TLOGPULL is taken by the TLOG replica, the PULL should go to AZ1_TLOGPULL_LOWFREEDISK and AZ2_TLOGPULL
+ // Shard 2: Similarly AZ2_TLOGPULL is taken. Replicas should go to AZ1_TLOGPULL_LOWFREEDISK and AZ3_TLOGPULL
+ expectedPlacements = Set.of("1 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "1 PULL " + AZ2_TLOGPULL,
+ "2 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "2 PULL " + AZ3_TLOGPULL);
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+ /**
+ * Tests placement for new collection with nodes with a varying number of cores over multiple AZ's
+ */
+ @Test
+ public void testPlacementAzsCores() throws Exception {
+ String collectionName = "coresAzsCollection";
+
+ // Count cores == node index, and AZ's are: AZ0, AZ0, AZ0, AZ1, AZ1, AZ1, AZ2, AZ2, AZ2.
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ for (int i = 0; i < 9; i++) {
+ nodeBuilders.get(i).setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "AZ" + (i / 3))
+ .setCoreCount(i)
+ .setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+ }
+
+ // The collection does not exist, has 1 shard.
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ List<List<String>> shardsReplicas = List.of(List.of());
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Test placing between 1 and 9 NRT replicas. check that it's done in order
+ List<Set<String>> placements = List.of(
+ Set.of("1 NRT 0"),
+ Set.of("1 NRT 0", "1 NRT 3"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2", "1 NRT 5"),
+ Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2", "1 NRT 5", "1 NRT 8"));
+
+ for (int countNrtToPlace = 1; countNrtToPlace <= 9; countNrtToPlace++) {
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
+ countNrtToPlace, 0, 0);
+ PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ verifyPlacements(placements.get(countNrtToPlace - 1), pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+ }
+
+ /**
+ * Tests that if a collection has replicas on nodes not currently live, placement for new replicas works ok.
+ */
+ @Test
+ public void testCollectionOnDeadNodes() throws Exception {
+ String collectionName = "walkingDead";
+
+ // Cluster nodes and their attributes
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ int coreCount = 0;
+ for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
+ nodeBuilder.setCoreCount(coreCount++).setFreeDiskGB(PRIORITIZED_FREE_DISK_GB + 1);
+ }
+
+ // The collection already exists with shards and replicas
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ // The collection below has shard 1 having replicas only on dead nodes and shard 2 no replicas at all... (which is
+ // likely a challenging condition to recover from, but the placement computations should still execute happily).
+ List<List<String>> shardsReplicas = List.of(
+ List.of("NRT 10", "TLOG 11"), // shard 1
+ List.of()); // shard 2
+ collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
+ SolrCollection solrCollection = collectionBuilder.build();
+
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place an additional PULL replica for shard 1
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, Set.of(solrCollection.iterator().next().getShardName()), new HashSet<>(liveNodes),
+ 0, 0, 1);
+
+ PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+
+ // Each expected placement is represented as a string "shard replica-type node"
+ // Node 0 has less cores than node 1 (0 vs 1) so the placement should go there.
+ Set<String> expectedPlacements = Set.of("1 PULL 0");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+
+ // If we placed instead a replica for shard 2 (starting with the same initial cluster state, not including the first
+ // placement above), it should go too to node 0 since it has less cores...
+ Iterator<Shard> it = solrCollection.iterator();
+ it.next(); // skip first shard to do placement for the second one...
+ placementRequest = new PlacementRequestImpl(solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes),
+ 0, 0, 1);
+ pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl());
+ expectedPlacements = Set.of("2 PULL 0");
+ verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
+ }
+
+ /**
+ * Verifies that a computed set of placements does match the expected placement on nodes.
+ * @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the shard index, NRT the
+ * replica type and 3 the node on which the replica is placed. Shards are 1-based. Nodes 0-based.<p>
+ * Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of shards as defined
+ * for the collection in the call to {@link org.apache.solr.cluster.placement.Builders.CollectionBuilder#customCollectionSetup(List, List)}
+ * @param shardBuilders the shard builders are passed here to get the shard names by index (1-based) rather than by
+ * parsing the shard names (which would break if we change the shard naming scheme).
+ */
+ private static void verifyPlacements(Set<String> expectedPlacements, PlacementPlan placementPlan,
+ List<Builders.ShardBuilder> shardBuilders, List<Node> liveNodes) {
+ Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
+
+ // Prepare structures for looking up shard name index and node index
+ Map<String, Integer> shardNumbering = new HashMap<>();
+ int index = 1; // first shard is 1 not 0
+ for (Builders.ShardBuilder sb : shardBuilders) {
+ shardNumbering.put(sb.getShardName(), index++);
+ }
+ Map<Node, Integer> nodeNumbering = new HashMap<>();
+ index = 0;
+ for (Node n : liveNodes) {
+ nodeNumbering.put(n, index++);
+ }
+
+ if (expectedPlacements.size() != computedPlacements.size()) {
+ fail("Wrong number of placements, expected " + expectedPlacements.size() + " computed " + computedPlacements.size() + ". " +
+ getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+ }
+
+ Set<String> expected = new HashSet<>(expectedPlacements);
+ for (ReplicaPlacement p : computedPlacements) {
+ String lookUpPlacementResult = shardNumbering.get(p.getShardName()) + " " + p.getReplicaType().name() + " " + nodeNumbering.get(p.getNode());
+ if (!expected.remove(lookUpPlacementResult)) {
+ fail("Computed placement [" + lookUpPlacementResult + "] not expected. " +
+ getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
+ }
+ }
+ }
+
+ private static String getExpectedVsComputedPlacement(Set<String> expectedPlacements, Set<ReplicaPlacement> computedPlacements,
+ Map<String, Integer> shardNumbering, Map<Node, Integer> nodeNumbering) {
+
+ StringBuilder sb = new StringBuilder("Expected placement: ");
+ for (String placement : expectedPlacements) {
+ sb.append("[").append(placement).append("] ");
+ }
+
+ sb.append("Computed placement: ");
+ for (ReplicaPlacement placement : computedPlacements) {
+ String lookUpPlacementResult = shardNumbering.get(placement.getShardName()) + " " + placement.getReplicaType().name() + " " + nodeNumbering.get(placement.getNode());
+
+ sb.append("[").append(lookUpPlacementResult).append("] ");
+ }
+
+ return sb.toString();
+ }
+
+ @Test
+ public void testAvailabilityZones() throws Exception {
+ String collectionName = "azCollection";
+ int NUM_NODES = 6;
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
+ for (int i = 0; i < NUM_NODES; i++) {
+ Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
+ nodeBuilder.setCoreCount(0);
+ nodeBuilder.setFreeDiskGB(100L);
+ if (i < NUM_NODES / 2) {
+ nodeBuilder.setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "az1");
+ } else {
+ nodeBuilder.setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "az2");
+ }
+ }
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
+ clusterBuilder.addCollection(collectionBuilder);
+
+ Cluster cluster = clusterBuilder.build();
+
+ SolrCollection solrCollection = cluster.getCollection(collectionName);
+
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
+ StreamSupport.stream(solrCollection.shards().spliterator(), false)
+ .map(Shard::getShardName).collect(Collectors.toSet()),
+ cluster.getLiveNodes(), 2, 2, 2);
+
+ PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
+ AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
+ PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
+ // 2 shards, 6 replicas
+ assertEquals(12, pp.getReplicaPlacements().size());
+ // shard -> AZ -> replica count
+ Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
+ AttributeValues attributeValues = attributeFetcher.fetchAttributes();
+ for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
+ Optional<String> azOptional = attributeValues.getSystemProperty(rp.getNode(), AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP);
+ if (!azOptional.isPresent()) {
+ fail("missing AZ sysprop for node " + rp.getNode());
+ }
+ String az = azOptional.get();
+ replicas.computeIfAbsent(rp.getReplicaType(), type -> new HashMap<>())
+ .computeIfAbsent(rp.getShardName(), shard -> new HashMap<>())
+ .computeIfAbsent(az, zone -> new AtomicInteger()).incrementAndGet();
+ }
+ replicas.forEach((type, perTypeReplicas) -> {
+ perTypeReplicas.forEach((shard, azCounts) -> {
+ assertEquals("number of AZs", 2, azCounts.size());
+ azCounts.forEach((az, count) -> {
+ assertTrue("too few replicas shard=" + shard + ", type=" + type + ", az=" + az,
+ count.get() >= 1);
+ });
+ });
+ });
+ }
+
+ @Test
+ public void testReplicaType() throws Exception {
+ String collectionName = "replicaTypeCollection";
+ int NUM_NODES = 6;
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
+ for (int i = 0; i < NUM_NODES; i++) {
+ Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
+ nodeBuilder.setCoreCount(0);
+ nodeBuilder.setFreeDiskGB(100L);
+ if (i < NUM_NODES / 3 * 2) {
+ nodeBuilder.setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, "Nrt, TlOg");
+ nodeBuilder.setSysprop("group", "one");
+ } else {
+ nodeBuilder.setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, "Pull,foobar");
+ nodeBuilder.setSysprop("group", "two");
+ }
+ }
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
+ clusterBuilder.addCollection(collectionBuilder);
+
+ Cluster cluster = clusterBuilder.build();
+
+ SolrCollection solrCollection = cluster.getCollection(collectionName);
+
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
+ StreamSupport.stream(solrCollection.shards().spliterator(), false)
+ .map(Shard::getShardName).collect(Collectors.toSet()),
+ cluster.getLiveNodes(), 2, 2, 2);
+
+ PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
+ AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
+ PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
+ // 2 shards, 6 replicas
+ assertEquals(12, pp.getReplicaPlacements().size());
+ // shard -> group -> replica count
+ Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
+ AttributeValues attributeValues = attributeFetcher.fetchAttributes();
+ for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
+ Optional<String> groupOptional = attributeValues.getSystemProperty(rp.getNode(), "group");
+ if (!groupOptional.isPresent()) {
+ fail("missing group sysprop for node " + rp.getNode());
+ }
+ String group = groupOptional.get();
+ if (group.equals("one")) {
+ assertTrue("wrong replica type in group one",
+ (rp.getReplicaType() == Replica.ReplicaType.NRT) || rp.getReplicaType() == Replica.ReplicaType.TLOG);
+ } else {
+ assertEquals("wrong replica type in group two", Replica.ReplicaType.PULL, rp.getReplicaType());
+ }
+ replicas.computeIfAbsent(rp.getReplicaType(), type -> new HashMap<>())
+ .computeIfAbsent(rp.getShardName(), shard -> new HashMap<>())
+ .computeIfAbsent(group, g -> new AtomicInteger()).incrementAndGet();
+ }
+ replicas.forEach((type, perTypeReplicas) -> {
+ perTypeReplicas.forEach((shard, groupCounts) -> {
+ assertEquals("number of groups", 1, groupCounts.size());
+ groupCounts.forEach((group, count) -> {
+ assertTrue("too few replicas shard=" + shard + ", type=" + type + ", group=" + group,
+ count.get() >= 1);
+ });
+ });
+ });
+
+ }
+
+ @Test
+ public void testFreeDiskConstraints() throws Exception {
+ String collectionName = "freeDiskCollection";
+ int NUM_NODES = 3;
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
+ Node smallNode = null;
+ for (int i = 0; i < NUM_NODES; i++) {
+ Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
+ nodeBuilder.setCoreCount(0);
+ if (i == 0) {
+ // default minimalFreeDiskGB == 20
+ nodeBuilder.setFreeDiskGB(1L);
+ smallNode = nodeBuilder.build();
+ } else {
+ nodeBuilder.setFreeDiskGB(100L);
+ }
+ }
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
+ clusterBuilder.addCollection(collectionBuilder);
+
+ Cluster cluster = clusterBuilder.build();
+
+ SolrCollection solrCollection = cluster.getCollection(collectionName);
+
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
+ StreamSupport.stream(solrCollection.shards().spliterator(), false)
+ .map(Shard::getShardName).collect(Collectors.toSet()),
+ cluster.getLiveNodes(), 1, 0, 1);
+
+ PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
+ AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
+ PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
+ assertEquals(4, pp.getReplicaPlacements().size());
+ for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
+ assertFalse("should not put any replicas on " + smallNode, rp.getNode().equals(smallNode));
+ }
+ }
+
+ @Test @Slow
+ public void testScalability() throws Exception {
+ log.info("==== numNodes ====");
+ runTestScalability(1000, 100, 40, 40, 20);
+ runTestScalability(2000, 100, 40, 40, 20);
+ runTestScalability(5000, 100, 40, 40, 20);
+ runTestScalability(10000, 100, 40, 40, 20);
+ runTestScalability(20000, 100, 40, 40, 20);
+ log.info("==== numShards ====");
+ runTestScalability(5000, 100, 40, 40, 20);
+ runTestScalability(5000, 200, 40, 40, 20);
+ runTestScalability(5000, 500, 40, 40, 20);
+ runTestScalability(5000, 1000, 40, 40, 20);
+ runTestScalability(5000, 2000, 40, 40, 20);
+ log.info("==== numReplicas ====");
+ runTestScalability(5000, 100, 100, 0, 0);
+ runTestScalability(5000, 100, 200, 0, 0);
+ runTestScalability(5000, 100, 500, 0, 0);
+ runTestScalability(5000, 100, 1000, 0, 0);
+ runTestScalability(5000, 100, 2000, 0, 0);
+ }
+
+ private void runTestScalability(int numNodes, int numShards, int nrtReplicas, int tlogReplicas, int pullReplicas) throws Exception {
+ String collectionName = "scaleCollection";
+
+ Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(numNodes);
+ LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
+ for (int i = 0; i < numNodes; i++) {
+ nodeBuilders.get(i).setCoreCount(0).setFreeDiskGB(Long.valueOf(numNodes));
+ }
+
+ Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
+ collectionBuilder.initializeShardsReplicas(numShards, 0, 0, 0, List.of());
+
+ Cluster cluster = clusterBuilder.build();
+ AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
+
+ SolrCollection solrCollection = collectionBuilder.build();
+ List<Node> liveNodes = clusterBuilder.buildLiveNodes();
+
+ // Place replicas for all the shards of the (newly created since it has no replicas yet) collection
+ PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(),
+ new HashSet<>(liveNodes), nrtReplicas, tlogReplicas, pullReplicas);
+
+ long start = System.nanoTime();
+ PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, new PlacementPlanFactoryImpl());
+ long end = System.nanoTime();
+
+ final int REPLICAS_PER_SHARD = nrtReplicas + tlogReplicas + pullReplicas;
+ final int TOTAL_REPLICAS = numShards * REPLICAS_PER_SHARD;
+
+ log.info("ComputePlacement: {} nodes, {} shards, {} total replicas, elapsed time {} ms.", numNodes, numShards, TOTAL_REPLICAS, TimeUnit.NANOSECONDS.toMillis(end - start)); //nowarn
+ assertEquals("incorrect number of calculated placements", TOTAL_REPLICAS,
+ pp.getReplicaPlacements().size());
+ // check that replicas are correctly placed
+ Map<Node, AtomicInteger> replicasPerNode = new HashMap<>();
+ Map<Node, Set<String>> shardsPerNode = new HashMap<>();
+ Map<String, AtomicInteger> replicasPerShard = new HashMap<>();
+ Map<Replica.ReplicaType, AtomicInteger> replicasByType = new HashMap<>();
+ for (ReplicaPlacement placement : pp.getReplicaPlacements()) {
+ replicasPerNode.computeIfAbsent(placement.getNode(), n -> new AtomicInteger()).incrementAndGet();
+ shardsPerNode.computeIfAbsent(placement.getNode(), n -> new HashSet<>()).add(placement.getShardName());
+ replicasByType.computeIfAbsent(placement.getReplicaType(), t -> new AtomicInteger()).incrementAndGet();
+ replicasPerShard.computeIfAbsent(placement.getShardName(), s -> new AtomicInteger()).incrementAndGet();
+ }
+ int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
+ replicasPerNode.forEach((node, count) -> {
+ assertEquals(count.get(), perNode);
+ });
+ shardsPerNode.forEach((node, names) -> {
+ assertEquals(names.size(), perNode);
+ });
+
+ replicasPerShard.forEach((shard, count) -> {
+ assertEquals(count.get(), REPLICAS_PER_SHARD);
+ });
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
index 0460c46..4e91d47 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
@@ -127,7 +127,13 @@ public class ClusterProperties {
@SuppressWarnings("unchecked")
public void update(MapWriter obj, String... path) throws KeeperException, InterruptedException{
client.atomicUpdate(ZkStateReader.CLUSTER_PROPS, bytes -> {
- Map<String, Object> zkJson = (Map<String, Object>) Utils.fromJSON(bytes);
+ Map<String, Object> zkJson;
+ if (bytes == null) {
+ // no previous properties - initialize
+ zkJson = new LinkedHashMap<>();
+ } else {
+ zkJson = (Map<String, Object>) Utils.fromJSON(bytes);
+ }
Utils.setObjectByPath(zkJson, Arrays.asList(path), obj);
return Utils.toJSON(zkJson);
});
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 508a445..7ff69cf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -143,6 +143,10 @@ public class ZkStateReader implements SolrCloseable {
public static final String REPLICA_TYPE = "type";
+ public static final String CONTAINER_PLUGINS = "plugin";
+
+ public static final String PLACEMENT_PLUGIN = "placement-plugin";
+
/**
* A view of the current state of all collections.
*/
@@ -227,7 +231,10 @@ public class ZkStateReader implements SolrCloseable {
MAX_CORES_PER_NODE,
SAMPLE_PERCENTAGE,
SOLR_ENVIRONMENT,
- CollectionAdminParams.DEFAULTS);
+ CollectionAdminParams.DEFAULTS,
+ CONTAINER_PLUGINS,
+ PLACEMENT_PLUGIN
+ );
/**
* Returns config set name for collection.