You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/12/10 04:14:05 UTC
[lucene-solr] branch branch_8x updated: SOLR-12217: Support
shards.preference in SolrJ for individual shard requests (#984)
This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 53345cb SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)
53345cb is described below
commit 53345cb1cd1eb54e33af65046825f624ba3cadd1
Author: Houston Putman <ho...@apache.org>
AuthorDate: Mon Dec 9 17:11:58 2019 -0500
SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)
---
solr/CHANGES.txt | 2 +
.../org/apache/solr/handler/StreamHandler.java | 22 ++++++
solr/solr-ref-guide/src/distributed-requests.adoc | 2 +-
solr/solr-ref-guide/src/streaming-expressions.adoc | 9 +++
solr/solr-ref-guide/src/using-solrj.adoc | 8 +++
.../client/solrj/impl/BaseCloudSolrClient.java | 82 +++++++++++++---------
.../client/solrj/io/stream/CloudSolrStream.java | 3 +-
.../client/solrj/io/stream/DeepRandomStream.java | 6 +-
.../solr/client/solrj/io/stream/StreamContext.java | 20 ++++++
.../solr/client/solrj/io/stream/TupleStream.java | 50 +++++++++----
.../routing/NodePreferenceRulesComparator.java | 2 +-
.../RequestReplicaListTransformerGenerator.java | 40 ++++++++---
.../java/org/apache/solr/common/cloud/Replica.java | 2 +-
.../solrj/impl/CloudHttp2SolrClientTest.java | 69 ++++++++++++++++++
.../client/solrj/impl/CloudSolrClientTest.java | 71 ++++++++++++++++++-
.../solr/client/solrj/io/stream/StreamingTest.java | 58 ++++++++++++++-
.../org/apache/solr/cloud/SolrCloudTestCase.java | 17 +++++
17 files changed, 396 insertions(+), 67 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3ae0a79..7ef1cc3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -62,6 +62,8 @@ New Features
* SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N)
+* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe)
+
Improvements
---------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index ccbbb3a..1502190 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -42,7 +42,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -162,9 +165,28 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return;
}
+
+ final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
+ ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
+ RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+ if (zkController != null) {
+ requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
+ zkController.getZkStateReader().getClusterProperties()
+ .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
+ .toString(),
+ zkController.getNodeName(),
+ zkController.getBaseUrl(),
+ zkController.getSysPropsCacher()
+ );
+ } else {
+ requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+ }
+
int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
StreamContext context = new StreamContext();
+ context.setRequestParams(params);
+ context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;
diff --git a/solr/solr-ref-guide/src/distributed-requests.adoc b/solr/solr-ref-guide/src/distributed-requests.adoc
index dde0fea..4d06728 100644
--- a/solr/solr-ref-guide/src/distributed-requests.adoc
+++ b/solr/solr-ref-guide/src/distributed-requests.adoc
@@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t
The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc.
-IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported.
+IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients.
The properties that can be specified are as follows:
diff --git a/solr/solr-ref-guide/src/streaming-expressions.adoc b/solr/solr-ref-guide/src/streaming-expressions.adoc
index d666d76..57ca17c 100644
--- a/solr/solr-ref-guide/src/streaming-expressions.adoc
+++ b/solr/solr-ref-guide/src/streaming-expressions.adoc
@@ -114,6 +114,15 @@ unless the jvm has been started with `-DStreamingExpressionMacros=true` (usually
Because streaming expressions relies on the `/export` handler, many of the field and field type requirements to use `/export` are also requirements for `/stream`, particularly for `sort` and `fl` parameters. Please see the section <<exporting-result-sets.adoc#exporting-result-sets,Exporting Result Sets>> for details.
+=== Request Routing
+
+Streaming Expressions respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>> for any call to Solr.
+
+The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used.
+- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`)
+- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`)
+- Set as a default in the Cluster properties.
+
=== Adding Custom Expressions
Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface. To add a custom expression to the
diff --git a/solr/solr-ref-guide/src/using-solrj.adoc b/solr/solr-ref-guide/src/using-solrj.adoc
index e4e41f5..f60664d 100644
--- a/solr/solr-ref-guide/src/using-solrj.adoc
+++ b/solr/solr-ref-guide/src/using-solrj.adoc
@@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
+=== Cloud Request Routing
+
+The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
+Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards.
+If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly.
+
+For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first.
+
== Querying in SolrJ
`SolrClient` has a number of `query()` methods for fetching results from Solr. Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters. And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata.
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 0461e67..52038ad 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
@@ -55,6 +56,8 @@ import org.apache.solr.client.solrj.request.IsUpdateRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -69,7 +72,6 @@ import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
+ private final RequestReplicaListTransformerGenerator requestRLTGenerator;
boolean parallelUpdates; //TODO final
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@@ -221,6 +224,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
+ this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
}
/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
@@ -467,6 +471,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
+ } else {
+ params = new ModifiableSolrParams();
}
if (collection == null) {
@@ -492,10 +498,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
return null;
}
+ ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);
+
//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
- final Map<String,List<String>> urlMap = buildUrlMap(col);
+ final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
if (routes == null) {
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
@@ -616,12 +624,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
}
- private Map<String,List<String>> buildUrlMap(DocCollection col) {
+ private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
for (Slice slice : slices) {
String name = slice.getName();
- List<String> urls = new ArrayList<>();
+ List<Replica> sortedReplicas = new ArrayList<>();
Replica leader = slice.getLeader();
if (directUpdatesToLeadersOnly && leader == null) {
for (Replica replica : slice.getReplicas(
@@ -638,20 +646,22 @@ public abstract class BaseCloudSolrClient extends SolrClient {
// take unoptimized general path - we cannot find a leader yet
return null;
}
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
- String url = zkProps.getCoreUrl();
- urls.add(url);
+
if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
- if (!replica.getNodeName().equals(leader.getNodeName()) &&
- !replica.getName().equals(leader.getName())) {
- ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
- String url1 = zkProps1.getCoreUrl();
- urls.add(url1);
+ if (!replica.equals(leader)) {
+ sortedReplicas.add(replica);
}
}
}
- urlMap.put(name, urls);
+
+ // Sort the non-leader replicas according to the request parameters
+ replicaListTransformer.transform(sortedReplicas);
+
+ // put the leaderUrl first.
+ sortedReplicas.add(0, leader);
+
+ urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
}
return urlMap;
}
@@ -1046,6 +1056,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
reqParams = new ModifiableSolrParams();
}
+ ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);
+
final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
@@ -1087,34 +1099,38 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
// Gather URLs, grouped by leader or replica
- // TODO: allow filtering by group, role, etc
- Set<String> seenNodes = new HashSet<>();
- List<String> replicas = new ArrayList<>();
- String joinedInputCollections = StrUtils.join(inputCollections, ',');
+ List<Replica> sortedReplicas = new ArrayList<>();
+ List<Replica> replicas = new ArrayList<>();
for (Slice slice : slices.values()) {
- for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
- ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
- String node = coreNodeProps.getNodeName();
+ Replica leader = slice.getLeader();
+ for (Replica replica : slice.getReplicas()) {
+ String node = replica.getNodeName();
if (!liveNodes.contains(node) // Must be a live node to continue
- || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+ || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
continue;
- if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
- String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
- if (sendToLeaders && coreNodeProps.isLeader()) {
- theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
- } else {
- replicas.add(url); // replicas here
- }
+ if (sendToLeaders && replica.equals(leader)) {
+ sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
+ } else {
+ replicas.add(replica); // replicas here
}
}
}
- // Shuffle the leaders, if any (none if !sendToLeaders)
- Collections.shuffle(theUrlList, rand);
+ // Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
+ replicaListTransformer.transform(sortedReplicas);
+
+ // Sort the replicas, if any, according to the request preferences and append to our list
+ replicaListTransformer.transform(replicas);
- // Shuffle the replicas, if any, and append to our list
- Collections.shuffle(replicas, rand);
- theUrlList.addAll(replicas);
+ sortedReplicas.addAll(replicas);
+
+ String joinedInputCollections = StrUtils.join(inputCollections, ',');
+ Set<String> seenNodes = new HashSet<>();
+ sortedReplicas.forEach( replica -> {
+ if (seenNodes.add(replica.getNodeName())) {
+ theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
+ }
+ });
if (theUrlList.isEmpty()) {
collectionStateCache.keySet().removeAll(collectionNames);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 8464cf3..2220989 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -371,12 +371,13 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected void constructStreams() throws IOException {
try {
- List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
+ List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
+
for(String shardUrl : shardUrls) {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index d2ef18c..3881a64 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -309,12 +309,12 @@ public class DeepRandomStream extends TupleStream implements Expressible {
protected void constructStreams() throws IOException {
try {
-
- List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
-
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
+
+ List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
+
String rows = mParams.get(ROWS);
int r = Integer.parseInt(rows);
int newRows = r/shardUrls.size();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 778aace..8243b2a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
+import org.apache.solr.common.params.SolrParams;
/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
@@ -45,6 +47,8 @@ public class StreamContext implements Serializable{
private SolrClientCache clientCache;
private ModelCache modelCache;
private StreamFactory streamFactory;
+ private SolrParams requestParams;
+ private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
public ConcurrentMap getObjectCache() {
return this.objectCache;
@@ -101,4 +105,20 @@ public class StreamContext implements Serializable{
public StreamFactory getStreamFactory() {
return this.streamFactory;
}
+
+ public void setRequestParams(SolrParams requestParams) {
+ this.requestParams = requestParams;
+ }
+
+ public SolrParams getRequestParams() {
+ return requestParams;
+ }
+
+ public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
+ this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
+ }
+
+ public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
+ return requestReplicaListTransformerGenerator;
+ }
}
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 94dd920..c2957bc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -21,26 +21,28 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Random;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Map;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
/**
@@ -118,6 +120,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
String collection,
StreamContext streamContext)
throws IOException {
+ return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
+ }
+
+ public static List<String> getShards(String zkHost,
+ String collection,
+ StreamContext streamContext,
+ SolrParams requestParams)
+ throws IOException {
Map<String, List<String>> shardsMap = null;
List<String> shards = new ArrayList();
@@ -130,24 +140,34 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
shards = shardsMap.get(collection);
} else {
//SolrCloud Sharding
- CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
+ CloudSolrClient cloudSolrClient =
+ Optional.ofNullable(streamContext.getSolrClientCache()).orElseGet(SolrClientCache::new).getCloudSolrClient(zkHost);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();
+
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+ solrParams.add(requestParams);
+
+ RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
+ Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);
+
+ ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
for(Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList<>();
- for(Replica replica : replicas) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
- shuffler.add(replica);
+ List<Replica> sortedReplicas = new ArrayList<>();
+ for(Replica replica : slice.getReplicas()) {
+ if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ sortedReplicas.add(replica);
+ }
}
- Collections.shuffle(shuffler, new Random());
- Replica rep = shuffler.get(0);
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
- String url = zkProps.getCoreUrl();
- shards.add(url);
+ replicaListTransformer.transform(sortedReplicas);
+ if (sortedReplicas.size() > 0) {
+ shards.add(sortedReplicas.get(0).getCoreUrl());
+ }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
index 4fdab0f..bb8cecb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
@@ -166,7 +166,7 @@ public class NodePreferenceRulesComparator implements Comparator<Object> {
return false;
}
final String s = ((Replica)o).getType().toString();
- return s.equals(preferred);
+ return s.equalsIgnoreCase(preferred);
}
public List<PreferenceRule> getSortRules() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
index 58c8b2e..12ce4cf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import org.apache.solr.common.SolrException;
@@ -41,9 +42,13 @@ public class RequestReplicaListTransformerGenerator {
(String configSpec, SolrParams requestParams, ReplicaListTransformerFactory fallback) -> shufflingReplicaListTransformer;
private final ReplicaListTransformerFactory stableRltFactory;
private final ReplicaListTransformerFactory defaultRltFactory;
+ private final String defaultShardPreferences;
+ private final String nodeName;
+ private final String localHostAddress;
+ private final NodesSysPropsCacher sysPropsCacher;
public RequestReplicaListTransformerGenerator() {
- this(RANDOM_RLTF);
+ this(null);
}
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory) {
@@ -51,16 +56,24 @@ public class RequestReplicaListTransformerGenerator {
}
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory) {
- this.defaultRltFactory = defaultRltFactory;
- if (stableRltFactory == null) {
- this.stableRltFactory = new AffinityReplicaListTransformerFactory();
- } else {
- this.stableRltFactory = stableRltFactory;
- }
+ this(defaultRltFactory, stableRltFactory, null, null, null, null);
+ }
+
+ public RequestReplicaListTransformerGenerator(String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
+ this(null, null, defaultShardPreferences, nodeName, localHostAddress, sysPropsCacher);
+ }
+
+ public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
+ this.defaultRltFactory = Optional.ofNullable(defaultRltFactory).orElse(RANDOM_RLTF);
+ this.stableRltFactory = Optional.ofNullable(stableRltFactory).orElseGet(AffinityReplicaListTransformerFactory::new);
+ this.defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse("");
+ this.nodeName = nodeName;
+ this.localHostAddress = localHostAddress;
+ this.sysPropsCacher = sysPropsCacher;
}
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams) {
- return getReplicaListTransformer(requestParams, "");
+ return getReplicaListTransformer(requestParams, null);
}
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences) {
@@ -70,6 +83,7 @@ public class RequestReplicaListTransformerGenerator {
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
@SuppressWarnings("deprecation")
final boolean preferLocalShards = requestParams.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
+ defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(this.defaultShardPreferences);
final String shardsPreferenceSpec = requestParams.get(ShardParams.SHARDS_PREFERENCE, defaultShardPreferences);
if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
@@ -84,7 +98,15 @@ public class RequestReplicaListTransformerGenerator {
preferenceRules.add(new PreferenceRule(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION, ShardParams.REPLICA_LOCAL));
}
- NodePreferenceRulesComparator replicaComp = new NodePreferenceRulesComparator(preferenceRules, requestParams, nodeName, localHostAddress, sysPropsCacher, defaultRltFactory, stableRltFactory);
+ NodePreferenceRulesComparator replicaComp =
+ new NodePreferenceRulesComparator(
+ preferenceRules,
+ requestParams,
+ Optional.ofNullable(nodeName).orElse(this.nodeName),
+ Optional.ofNullable(localHostAddress).orElse(this.localHostAddress),
+ Optional.ofNullable(sysPropsCacher).orElse(this.sysPropsCacher),
+ defaultRltFactory,
+ stableRltFactory);
ReplicaListTransformer baseReplicaListTransformer = replicaComp.getBaseReplicaListTransformer();
if (replicaComp.getSortRules() == null) {
// only applying base transformation
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index bc57176..5ff10c2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -102,7 +102,7 @@ public class Replica extends ZkNodeProps {
PULL;
public static Type get(String name){
- return name == null ? Type.NRT : Type.valueOf(name);
+ return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT));
}
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 793ee5f..74b02cb 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -488,6 +489,74 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
shardAddresses.size() > 1 && ports.size()==1);
}
+
+
+ /**
+ * Tests if the 'shards.preference' parameter works with single-sharded collections.
+ */
+ @Test
+ public void singleShardedPreferenceRules() throws Exception {
+ String collectionName = "singleShardPreferenceTestColl";
+
+ int liveNodes = cluster.getJettySolrRunners().size();
+
+ // For testing replica.type, we want to have all replica types available for the collection
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
+ .setMaxShardsPerNode(liveNodes)
+ .processAndWait(cluster.getSolrClient(), TIMEOUT);
+ cluster.waitForActiveCollection(collectionName, 1, liveNodes);
+
+ // Add some new documents
+ new UpdateRequest()
+ .add(id, "0", "a_t", "hello1")
+ .add(id, "2", "a_t", "hello2")
+ .add(id, "3", "a_t", "hello2")
+ .commit(getRandomClient(), collectionName);
+
+ // Run the actual test for 'queryReplicaType'
+ queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
+ queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
+ queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
+ }
+
+ private void queryReplicaType(CloudHttp2SolrClient cloudClient,
+ Replica.Type typeToQuery,
+ String collectionName)
+ throws Exception
+ {
+ SolrQuery qRequest = new SolrQuery("*:*");
+
+ ModifiableSolrParams qParams = new ModifiableSolrParams();
+ qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
+ qParams.add(ShardParams.SHARDS_INFO, "true");
+ qRequest.add(qParams);
+
+ Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
+
+ QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+ Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+ assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+ // Iterate over shards-info and check what cores responded
+ SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+ Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+ List<String> shardAddresses = new ArrayList<String>();
+ while (itr.hasNext()) {
+ Map.Entry<String, ?> e = itr.next();
+ assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+ String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+ if (shardAddress.endsWith("/")) {
+ shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
+ }
+ assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+ shardAddresses.add(shardAddress);
+ }
+ assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
+
+ assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
+ }
+
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 0025ace..57050ce 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -427,8 +428,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@SuppressWarnings("deprecation")
private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
- boolean useShardsPreference,
- String collectionName)
+ boolean useShardsPreference,
+ String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery("*:*");
@@ -476,6 +477,72 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
shardAddresses.size() > 1 && ports.size()==1);
}
+ /**
+ * Tests if the 'shards.preference' parameter works with single-sharded collections.
+ */
+ @Test
+ public void singleShardedPreferenceRules() throws Exception {
+ String collectionName = "singleShardPreferenceTestColl";
+
+ int liveNodes = cluster.getJettySolrRunners().size();
+
+ // For testing replica.type, we want to have all replica types available for the collection
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
+ .setMaxShardsPerNode(liveNodes)
+ .processAndWait(cluster.getSolrClient(), TIMEOUT);
+ cluster.waitForActiveCollection(collectionName, 1, liveNodes);
+
+ // Add some new documents
+ new UpdateRequest()
+ .add(id, "0", "a_t", "hello1")
+ .add(id, "2", "a_t", "hello2")
+ .add(id, "3", "a_t", "hello2")
+ .commit(getRandomClient(), collectionName);
+
+ // Run the actual test for 'queryReplicaType'
+ queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
+ queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
+ queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
+ }
+
+ private void queryReplicaType(CloudSolrClient cloudClient,
+ Replica.Type typeToQuery,
+ String collectionName)
+ throws Exception
+ {
+ SolrQuery qRequest = new SolrQuery("*:*");
+
+ ModifiableSolrParams qParams = new ModifiableSolrParams();
+ qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
+ qParams.add(ShardParams.SHARDS_INFO, "true");
+ qRequest.add(qParams);
+
+ Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
+
+ QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+ Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+ assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+ // Iterate over shards-info and check what cores responded
+ SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+ Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+ List<String> shardAddresses = new ArrayList<String>();
+ while (itr.hasNext()) {
+ Map.Entry<String, ?> e = itr.next();
+ assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+ String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+ if (shardAddress.endsWith("/")) {
+ shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
+ }
+ assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+ shardAddresses.add(shardAddress);
+ }
+ assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
+
+ assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
+ }
+
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 2ecdf24..475b74d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -48,8 +48,11 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.Assume;
import org.junit.Before;
@@ -68,6 +71,7 @@ import org.junit.Test;
public class StreamingTest extends SolrCloudTestCase {
public static final String COLLECTIONORALIAS = "streams";
+public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica";
private static final StreamFactory streamFactory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
@@ -100,7 +104,8 @@ public static void configureCluster() throws Exception {
} else {
collection = COLLECTIONORALIAS;
}
- CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collection, "conf", numShards, 1)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection(collection, numShards, numShards);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
@@ -108,6 +113,20 @@ public static void configureCluster() throws Exception {
zkHost = cluster.getZkServer().getZkAddress();
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
+
+ // Set up multi-replica collection
+ if (useAlias) {
+ collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection";
+ } else {
+ collection = MULTI_REPLICA_COLLECTIONORALIAS;
+ }
+ CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1)
+ .setMaxShardsPerNode(numShards * 3)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collection, numShards, numShards * 3);
+ if (useAlias) {
+ CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+ }
}
private static final String id = "id";
@@ -2435,6 +2454,43 @@ public void testParallelRankStream() throws Exception {
}
}
+
+ @Test
+ public void testTupleStreamGetShardsPreference() throws Exception {
+ StreamContext streamContext = new StreamContext();
+ streamContext.setSolrClientCache(new SolrClientCache());
+ streamContext.setRequestReplicaListTransformerGenerator(new RequestReplicaListTransformerGenerator(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null));
+
+ streamContext.setRequestParams(mapParams(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt"));
+
+ try {
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ List<String> strings = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
+ String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS;
+ Map<String, String> replicaTypeMap = mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName));
+
+ // Test from extra params
+ SolrParams sParams = mapParams("q", "*:*", ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull");
+ testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap);
+
+ // Test defaults from streamContext.getParams()
+ testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap);
+
+ // Test defaults from the RLTG
+ streamContext.setRequestParams(new ModifiableSolrParams());
+ testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap);
+ } finally {
+ streamContext.getSolrClientCache().close();
+ }
+ }
+
+ public void testTupleStreamSorting(StreamContext streamContext, SolrParams solrParams, String replicaType, Map<String, String> replicaTypeMap) throws Exception {
+ List<String> shards = TupleStream.getShards(cluster.getZkClient().getZkServerAddress(), MULTI_REPLICA_COLLECTIONORALIAS, streamContext, solrParams);
+ for (String shard : shards) {
+ assertEquals(shard, replicaType.toUpperCase(Locale.ROOT), replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT));
+ }
+ }
+
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 4ce7a5e..b8e0798 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -490,4 +490,21 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
cluster.waitForAllNodes(timeoutSeconds);
}
+ public static Map<String, String> mapReplicasToReplicaType(DocCollection collection) {
+ Map<String, String> replicaTypeMap = new HashMap<>();
+ for (Slice slice : collection.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ String coreUrl = replica.getCoreUrl();
+ // It seems replica reports its core URL with a trailing slash while shard
+ // info returned from the query doesn't. Oh well. We will include both, just in case
+ replicaTypeMap.put(coreUrl, replica.getType().toString());
+ if (coreUrl.endsWith("/")) {
+ replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString());
+ }else {
+ replicaTypeMap.put(coreUrl + "/", replica.getType().toString());
+ }
+ }
+ }
+ return replicaTypeMap;
+ }
}