You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2020/12/07 16:03:21 UTC
[lucene-solr] branch master updated: SOLR-14987: Reuse
HttpSolrClient per node vs. one per Solr core when using CloudSolrStream
(#2067)
This is an automated email from the ASF dual-hosted git repository.
thelabdude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 30e5e38 SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (#2067)
30e5e38 is described below
commit 30e5e38336de49433a7ecc60fb169c2426278565
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Mon Dec 7 09:03:03 2020 -0700
SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (#2067)
---
solr/CHANGES.txt | 2 +
.../solr/client/solrj/io/SolrClientCache.java | 2 +
.../client/solrj/io/stream/CloudSolrStream.java | 96 +++++++---------
.../solr/client/solrj/io/stream/SolrStream.java | 27 +++--
.../solr/client/solrj/io/stream/TupleStream.java | 128 +++++++++++----------
.../solr/client/solrj/io/stream/StreamingTest.java | 60 ++++++++++
6 files changed, 193 insertions(+), 122 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index fa4a2fe..13d81f3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -222,6 +222,8 @@ Improvements
and solr_metrics_overseer_collectionWorkQueueSize with corresponding entries in the the Prometheus exporter's
default/stock configuration. (Saatchi Bhalla, Megan Carey, Andrzej BiaĆecki, David Smiley)
+* SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (Timothy Potter)
+
Optimizations
---------------------
* SOLR-14975: Optimize CoreContainer.getAllCoreNames, getLoadedCoreNames and getCoreDescriptors. (Bruno Roustant)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 7151e5d..e2d9680 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.ArrayList;
import java.util.HashMap;
@@ -53,6 +54,7 @@ public class SolrClientCache implements Serializable {
}
public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
+ Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
CloudSolrClient client;
if (solrClients.containsKey(zkHost)) {
client = (CloudSolrClient) solrClients.get(zkHost);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 1f26c98..80a2211 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
@@ -47,8 +48,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -152,11 +153,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
- /*
- if(null == zkHost){
- throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
- }
- */
// We've got all the required items
init(collectionName, zkHost, mParams);
@@ -237,7 +233,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
// If the comparator is null then it was not explicitly set so we will create one using the sort parameter
// of the query. While doing this we will also take into account any aliases such that if we are sorting on
- // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
+ // fieldA but fieldA is aliased to alias.fieldA then the comparator will be against alias.fieldA.
if (params.get("q") == null) {
throw new IOException("q param expected for search function");
@@ -334,63 +330,58 @@ public class CloudSolrStream extends TupleStream implements Expressible {
public static Slice[] getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
ClusterState clusterState = zkStateReader.getClusterState();
- Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
-
- //TODO we should probably split collection by comma to query more than one
- // which is something already supported in other parts of Solr
-
// check for alias or collection
List<String> allCollections = new ArrayList<>();
String[] collectionNames = collectionName.split(",");
+ Aliases aliases = checkAlias ? zkStateReader.getAliases() : null;
+
for(String col : collectionNames) {
- List<String> collections = checkAlias
- ? zkStateReader.getAliases().resolveAliases(col) // if not an alias, returns collectionName
+ List<String> collections = (aliases != null)
+ ? aliases.resolveAliases(col) // if not an alias, returns collectionName
: Collections.singletonList(collectionName);
allCollections.addAll(collections);
}
// Lookup all actives slices for these collections
List<Slice> slices = allCollections.stream()
- .map(collectionsMap::get)
+ .map(c -> clusterState.getCollectionOrNull(c, true))
.filter(Objects::nonNull)
.flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))
.collect(Collectors.toList());
if (!slices.isEmpty()) {
- return slices.toArray(new Slice[slices.size()]);
- }
-
- // Check collection case insensitive
- for(Entry<String, DocCollection> entry : collectionsMap.entrySet()) {
- if(entry.getKey().equalsIgnoreCase(collectionName)) {
- return entry.getValue().getActiveSlicesArr();
- }
+ return slices.toArray(new Slice[0]);
}
throw new IOException("Slices not found for " + collectionName);
}
protected void constructStreams() throws IOException {
+ final ModifiableSolrParams mParams = adjustParams(new ModifiableSolrParams(params));
+ mParams.set(DISTRIB, "false"); // We are the aggregator.
try {
+ final Stream<SolrStream> streamOfSolrStream;
+ if (streamContext != null && streamContext.get("shards") != null) {
+ // stream of shard url with core
+ streamOfSolrStream = getShards(this.zkHost, this.collection, this.streamContext, mParams).stream()
+ .map(s -> new SolrStream(s, mParams));
+ } else {
+ // stream of replicas to reuse the same SolrHttpClient per baseUrl
+ // avoids re-parsing data we already have in the replicas
+ streamOfSolrStream = getReplicas(this.zkHost, this.collection, this.streamContext, mParams).stream()
+ .map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
+ }
-
- ModifiableSolrParams mParams = new ModifiableSolrParams(params);
- mParams = adjustParams(mParams);
- mParams.set(DISTRIB, "false"); // We are the aggregator.
-
- List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
-
- for(String shardUrl : shardUrls) {
- SolrStream solrStream = new SolrStream(shardUrl, mParams);
+ streamOfSolrStream.forEach(ss -> {
if(streamContext != null) {
- solrStream.setStreamContext(streamContext);
+ ss.setStreamContext(streamContext);
if (streamContext.isLocal()) {
- solrStream.setDistrib(false);
+ ss.setDistrib(false);
}
}
- solrStream.setFieldMappings(this.fieldMappings);
- solrStreams.add(solrStream);
- }
+ ss.setFieldMappings(this.fieldMappings);
+ solrStreams.add(ss);
+ });
} catch (Exception e) {
throw new IOException(e);
}
@@ -398,24 +389,17 @@ public class CloudSolrStream extends TupleStream implements Expressible {
private void openStreams() throws IOException {
ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("CloudSolrStream"));
+ List<Future<TupleWrapper>> futures =
+ solrStreams.stream().map(ss -> service.submit(new StreamOpener((SolrStream)ss, comp))).collect(Collectors.toList());
try {
- List<Future<TupleWrapper>> futures = new ArrayList<>();
- for (TupleStream solrStream : solrStreams) {
- StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
- Future<TupleWrapper> future = service.submit(so);
- futures.add(future);
- }
-
- try {
- for (Future<TupleWrapper> f : futures) {
- TupleWrapper w = f.get();
- if (w != null) {
- tuples.add(w);
- }
+ for (Future<TupleWrapper> f : futures) {
+ TupleWrapper w = f.get();
+ if (w != null) {
+ tuples.add(w);
}
- } catch (Exception e) {
- throw new IOException(e);
}
+ } catch (Exception e) {
+ throw new IOException(e);
} finally {
service.shutdown();
}
@@ -465,8 +449,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected class TupleWrapper implements Comparable<TupleWrapper> {
private Tuple tuple;
- private SolrStream stream;
- private StreamComparator comp;
+ private final SolrStream stream;
+ private final StreamComparator comp;
public TupleWrapper(SolrStream stream, StreamComparator comp) {
this.stream = stream;
@@ -512,8 +496,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected class StreamOpener implements Callable<TupleWrapper> {
- private SolrStream stream;
- private StreamComparator comp;
+ private final SolrStream stream;
+ private final StreamComparator comp;
public StreamOpener(SolrStream stream, StreamComparator comp) {
this.stream = stream;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 107d9e8..e6210ee 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -69,6 +68,7 @@ public class SolrStream extends TupleStream {
private boolean distrib = true;
private String user;
private String password;
+ private String core;
/**
* @param baseUrl Base URL of the stream.
@@ -80,6 +80,11 @@ public class SolrStream extends TupleStream {
this.params = params;
}
+ SolrStream(String baseUrl, SolrParams params, String core) {
+ this(baseUrl, params);
+ this.core = core;
+ }
+
public void setFieldMappings(Map<String, String> fieldMappings) {
this.fieldMappings = fieldMappings;
}
@@ -109,6 +114,8 @@ public class SolrStream extends TupleStream {
**/
public void open() throws IOException {
+
+ // Reuse the same client per node vs. having one per replica
if(cache == null) {
client = new HttpSolrClient.Builder(baseUrl).build();
} else {
@@ -120,7 +127,7 @@ public class SolrStream extends TupleStream {
if (!distrib) {
((ModifiableSolrParams) requestParams).add("distrib","false");
}
- tupleStreamParser = constructParser(client, requestParams);
+ tupleStreamParser = constructParser(requestParams);
} catch (Exception e) {
throw new IOException("params " + params, e);
}
@@ -187,7 +194,7 @@ public class SolrStream extends TupleStream {
if (closeableHttpResponse != null) {
closeableHttpResponse.close();
}
- if(cache == null) {
+ if(cache == null && client != null) {
client.close();
}
}
@@ -268,8 +275,7 @@ public class SolrStream extends TupleStream {
return fields;
}
- // temporary...
- public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+ private TupleStreamParser constructParser(SolrParams requestParams) throws IOException, SolrServerException {
String p = requestParams.get("qt");
if (p != null) {
ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
@@ -280,7 +286,14 @@ public class SolrStream extends TupleStream {
String wt = requestParams.get(CommonParams.WT, "json");
QueryRequest query = new QueryRequest(requestParams);
- query.setPath(p);
+
+ // in order to reuse HttpSolrClient objects per node, we need to cache them without the core name in the URL
+ if (core != null) {
+ query.setPath("/"+core + (p != null ? p : "/select"));
+ } else {
+ query.setPath(p);
+ }
+
query.setResponseParser(new InputStreamResponseParser(wt));
query.setMethod(SolrRequest.METHOD.POST);
@@ -288,7 +301,7 @@ public class SolrStream extends TupleStream {
query.setBasicAuthCredentials(user, password);
}
- NamedList<Object> genericResponse = server.request(query);
+ NamedList<Object> genericResponse = client.request(query);
InputStream stream = (InputStream) genericResponse.get("stream");
this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse");
if (CommonParams.JAVABIN.equals(wt)) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 9372fd6..0e48abe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -21,10 +21,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -122,76 +125,83 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
}
+ static List<Replica> getReplicas(String zkHost,
+ String collection,
+ StreamContext streamContext,
+ SolrParams requestParams)
+ throws IOException {
+ List<Replica> replicas = new LinkedList<>();
+
+ //SolrCloud Sharding
+ SolrClientCache solrClientCache = (streamContext != null ? streamContext.getSolrClientCache() : null);
+ final SolrClientCache localSolrClientCache; // tracks any locally allocated cache that needs to be closed locally
+ if (solrClientCache == null) { // streamContext was null OR streamContext.getSolrClientCache() returned null
+ solrClientCache = localSolrClientCache = new SolrClientCache();
+ } else {
+ localSolrClientCache = null;
+ }
+
+ if (zkHost == null) {
+ throw new IOException(String.format(Locale.ROOT,"invalid expression - zkHost not found for collection '%s'",collection));
+ }
+
+ CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
+ ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
+ Set<String> liveNodes = clusterState.getLiveNodes();
+
+ RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+ final ModifiableSolrParams solrParams;
+ if (streamContext != null) {
+ solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+ requestReplicaListTransformerGenerator = streamContext.getRequestReplicaListTransformerGenerator();
+ } else {
+ solrParams = new ModifiableSolrParams();
+ requestReplicaListTransformerGenerator = null;
+ }
+ if (requestReplicaListTransformerGenerator == null) {
+ requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+ }
+ solrParams.add(requestParams);
+
+ ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
+ final String coreFilter = streamContext != null && streamContext.isLocal() ? (String)streamContext.get("core") : null;
+ List<Replica> sortedReplicas = new ArrayList<>();
+ for(Slice slice : slices) {
+ slice.getReplicas().stream().filter(r -> r.isActive(liveNodes)).forEach(sortedReplicas::add);
+ replicaListTransformer.transform(sortedReplicas);
+ sortedReplicas.stream().filter(r -> coreFilter == null || coreFilter.equals(r.core)).findFirst().ifPresent(replicas::add);
+ sortedReplicas.clear();
+ }
+
+ if (localSolrClientCache != null) {
+ localSolrClientCache.close();
+ }
+
+ return replicas;
+ }
+
@SuppressWarnings({"unchecked"})
public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext,
SolrParams requestParams)
throws IOException {
- Map<String, List<String>> shardsMap = null;
- List<String> shards = new ArrayList<>();
-
- if(streamContext != null) {
- shardsMap = (Map<String, List<String>>)streamContext.get("shards");
- }
+ List<String> shards;
+ Map<String, List<String>> shardsMap = streamContext != null ? (Map<String, List<String>>)streamContext.get("shards") : null;
if(shardsMap != null) {
//Manual Sharding
shards = shardsMap.get(collection);
- } else {
- //SolrCloud Sharding
- SolrClientCache solrClientCache = (streamContext != null ? streamContext.getSolrClientCache() : null);
- final SolrClientCache localSolrClientCache; // tracks any locally allocated cache that needs to be closed locally
- if (solrClientCache == null) { // streamContext was null OR streamContext.getSolrClientCache() returned null
- solrClientCache = localSolrClientCache = new SolrClientCache();
- } else {
- localSolrClientCache = null;
- }
- CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
- Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
- Set<String> liveNodes = clusterState.getLiveNodes();
-
-
- RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
- final ModifiableSolrParams solrParams;
- if (streamContext != null) {
- solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
- requestReplicaListTransformerGenerator = streamContext.getRequestReplicaListTransformerGenerator();
- } else {
- solrParams = new ModifiableSolrParams();
- requestReplicaListTransformerGenerator = null;
- }
- if (requestReplicaListTransformerGenerator == null) {
- requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
- }
- solrParams.add(requestParams);
-
- ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
-
- for(Slice slice : slices) {
- List<Replica> sortedReplicas = new ArrayList<>();
- for(Replica replica : slice.getReplicas()) {
- if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
- sortedReplicas.add(replica);
- }
- }
-
- replicaListTransformer.transform(sortedReplicas);
- if (sortedReplicas.size() > 0) {
- shards.add(sortedReplicas.get(0).getCoreUrl());
- }
- }
- if (localSolrClientCache != null) {
- localSolrClientCache.close();
- }
- }
- if (streamContext != null) {
- Object core = streamContext.get("core");
- if (streamContext.isLocal() && core != null) {
+ final Object core = streamContext.isLocal() ? streamContext.get("core") : null;
+ if (core != null) {
shards.removeIf(shardUrl -> !shardUrl.contains((CharSequence) core));
}
+ } else {
+ shards = getReplicas(zkHost, collection, streamContext, requestParams).stream()
+ .map(Replica::getCoreUrl).collect(Collectors.toList());
}
return shards;
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index b4e0704..a8e53cd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@@ -52,6 +54,7 @@ import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerat
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -128,6 +131,7 @@ public static void configureCluster() throws Exception {
if (useAlias) {
CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
}
+ streamFactory.withCollectionZkHost(MULTI_REPLICA_COLLECTIONORALIAS, zkHost);
}
private static final String id = "id";
@@ -2723,4 +2727,60 @@ public void testParallelRankStream() throws Exception {
}
}
+ @Test
+ public void testCloudStreamClientCache() throws Exception {
+
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ solrClientCache.getCloudSolrClient(zkHost);
+ streamContext.setSolrClientCache(solrClientCache);
+
+ String expr = "search(" + MULTI_REPLICA_COLLECTIONORALIAS + ",q=*:*,fl=\"a_i\", qt=\"/export\", sort=\"a_i asc\")";
+ try (CloudSolrStream stream = new CloudSolrStream(StreamExpressionParser.parse(expr), streamFactory)) {
+ stream.setStreamContext(streamContext);
+ stream.open();
+ Tuple t = stream.read();
+ while (!t.EOF) {
+ // no-op ... just want to iterate over the tuples
+ t = stream.read();
+ }
+
+ List<String> baseUrls = new LinkedList<>();
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ List<String> resolved = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
+ Set<String> liveNodes = zkStateReader.getClusterState().getLiveNodes();
+ int expectedNumStreams = 0;
+ for (String coll : resolved) {
+ DocCollection dcoll = zkStateReader.getCollection(coll);
+ for (Slice slice : dcoll.getSlices()) {
+ ++expectedNumStreams; // one Stream per slice
+ for (Replica r : slice.getReplicas()) {
+ if (r.isActive(liveNodes)) {
+ baseUrls.add(r.getBaseUrl());
+ }
+ }
+ }
+ }
+ List<TupleStream> solrStreams = stream.children();
+ assertEquals(expectedNumStreams, solrStreams.size());
+ for (TupleStream next : solrStreams) {
+ SolrStream ss = (SolrStream)next;
+ assertTrue(baseUrls.contains(ss.getBaseUrl())); // SolrStream uses the baseUrl of the replica and not the coreUrl
+ }
+
+ // verify core filtering
+ streamContext.setLocal(true);
+
+ for (String coll : resolved) {
+ Replica rr = zkStateReader.getCollection(coll).getReplicas().get(0);
+ streamContext.put("core", rr.core);
+ List<Replica> replicas = TupleStream.getReplicas(zkHost, coll, streamContext, new ModifiableSolrParams());
+ assertEquals("core filter for " + rr.core + " not applied for " + coll + "; replicas: " + replicas, 1, replicas.size());
+ assertEquals("core filter for " + rr.core + " not applied for " + coll, rr, replicas.get(0));
+ }
+
+ } finally {
+ solrClientCache.close();
+ }
+ }
}