You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2020/12/07 16:03:21 UTC

[lucene-solr] branch master updated: SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (#2067)

This is an automated email from the ASF dual-hosted git repository.

thelabdude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 30e5e38  SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (#2067)
30e5e38 is described below

commit 30e5e38336de49433a7ecc60fb169c2426278565
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Mon Dec 7 09:03:03 2020 -0700

    SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (#2067)
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/client/solrj/io/SolrClientCache.java      |   2 +
 .../client/solrj/io/stream/CloudSolrStream.java    |  96 +++++++---------
 .../solr/client/solrj/io/stream/SolrStream.java    |  27 +++--
 .../solr/client/solrj/io/stream/TupleStream.java   | 128 +++++++++++----------
 .../solr/client/solrj/io/stream/StreamingTest.java |  60 ++++++++++
 6 files changed, 193 insertions(+), 122 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index fa4a2fe..13d81f3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -222,6 +222,8 @@ Improvements
   and solr_metrics_overseer_collectionWorkQueueSize with corresponding entries in the the Prometheus exporter's
   default/stock configuration.  (Saatchi Bhalla, Megan Carey, Andrzej BiaƂecki, David Smiley)
 
+* SOLR-14987: Reuse HttpSolrClient per node vs. one per Solr core when using CloudSolrStream (Timothy Potter)
+
 Optimizations
 ---------------------
 * SOLR-14975: Optimize CoreContainer.getAllCoreNames, getLoadedCoreNames and getCoreDescriptors. (Bruno Roustant)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 7151e5d..e2d9680 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,6 +54,7 @@ public class SolrClientCache implements Serializable {
   }
 
   public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
+    Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
     CloudSolrClient client;
     if (solrClients.containsKey(zkHost)) {
       client = (CloudSolrClient) solrClients.get(zkHost);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 1f26c98..80a2211 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -47,8 +48,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -152,11 +153,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
     else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
       zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
     }
-    /*
-    if(null == zkHost){
-      throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
-    }
-    */
 
     // We've got all the required items
     init(collectionName, zkHost, mParams);
@@ -237,7 +233,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
     // If the comparator is null then it was not explicitly set so we will create one using the sort parameter
     // of the query. While doing this we will also take into account any aliases such that if we are sorting on
-    // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
+    // fieldA but fieldA is aliased to alias.fieldA then the comparator will be against alias.fieldA.
 
     if (params.get("q") == null) {
       throw new IOException("q param expected for search function");
@@ -334,63 +330,58 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   public static Slice[] getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
     ClusterState clusterState = zkStateReader.getClusterState();
 
-    Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
-
-    //TODO we should probably split collection by comma to query more than one
-    //  which is something already supported in other parts of Solr
-
     // check for alias or collection
 
     List<String> allCollections = new ArrayList<>();
     String[] collectionNames = collectionName.split(",");
+    Aliases aliases = checkAlias ? zkStateReader.getAliases() : null;
+
     for(String col : collectionNames) {
-      List<String> collections = checkAlias
-          ? zkStateReader.getAliases().resolveAliases(col)  // if not an alias, returns collectionName
+      List<String> collections = (aliases != null)
+          ? aliases.resolveAliases(col)  // if not an alias, returns collectionName
           : Collections.singletonList(collectionName);
       allCollections.addAll(collections);
     }
 
     // Lookup all actives slices for these collections
     List<Slice> slices = allCollections.stream()
-        .map(collectionsMap::get)
+        .map(c -> clusterState.getCollectionOrNull(c, true))
         .filter(Objects::nonNull)
         .flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))
         .collect(Collectors.toList());
     if (!slices.isEmpty()) {
-      return slices.toArray(new Slice[slices.size()]);
-    }
-
-    // Check collection case insensitive
-    for(Entry<String, DocCollection> entry : collectionsMap.entrySet()) {
-      if(entry.getKey().equalsIgnoreCase(collectionName)) {
-        return entry.getValue().getActiveSlicesArr();
-      }
+      return slices.toArray(new Slice[0]);
     }
 
     throw new IOException("Slices not found for " + collectionName);
   }
 
   protected void constructStreams() throws IOException {
+    final ModifiableSolrParams mParams = adjustParams(new ModifiableSolrParams(params));
+    mParams.set(DISTRIB, "false"); // We are the aggregator.
     try {
+      final Stream<SolrStream> streamOfSolrStream;
+      if (streamContext != null && streamContext.get("shards") != null) {
+        // stream of shard url with core
+        streamOfSolrStream = getShards(this.zkHost, this.collection, this.streamContext, mParams).stream()
+            .map(s -> new SolrStream(s, mParams));
+      } else {
+        // stream of replicas to reuse the same SolrHttpClient per baseUrl
+        // avoids re-parsing data we already have in the replicas
+        streamOfSolrStream = getReplicas(this.zkHost, this.collection, this.streamContext, mParams).stream()
+            .map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
+      }
 
-
-      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
-      mParams = adjustParams(mParams);
-      mParams.set(DISTRIB, "false"); // We are the aggregator.
-
-      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
-
-      for(String shardUrl : shardUrls) {
-        SolrStream solrStream = new SolrStream(shardUrl, mParams);
+      streamOfSolrStream.forEach(ss -> {
         if(streamContext != null) {
-          solrStream.setStreamContext(streamContext);
+          ss.setStreamContext(streamContext);
           if (streamContext.isLocal()) {
-            solrStream.setDistrib(false);
+            ss.setDistrib(false);
           }
         }
-        solrStream.setFieldMappings(this.fieldMappings);
-        solrStreams.add(solrStream);
-      }
+        ss.setFieldMappings(this.fieldMappings);
+        solrStreams.add(ss);
+      });
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -398,24 +389,17 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   private void openStreams() throws IOException {
     ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("CloudSolrStream"));
+    List<Future<TupleWrapper>> futures =
+        solrStreams.stream().map(ss -> service.submit(new StreamOpener((SolrStream)ss, comp))).collect(Collectors.toList());
     try {
-      List<Future<TupleWrapper>> futures = new ArrayList<>();
-      for (TupleStream solrStream : solrStreams) {
-        StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
-        Future<TupleWrapper> future = service.submit(so);
-        futures.add(future);
-      }
-
-      try {
-        for (Future<TupleWrapper> f : futures) {
-          TupleWrapper w = f.get();
-          if (w != null) {
-            tuples.add(w);
-          }
+      for (Future<TupleWrapper> f : futures) {
+        TupleWrapper w = f.get();
+        if (w != null) {
+          tuples.add(w);
         }
-      } catch (Exception e) {
-        throw new IOException(e);
       }
+    } catch (Exception e) {
+      throw new IOException(e);
     } finally {
       service.shutdown();
     }
@@ -465,8 +449,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   protected class TupleWrapper implements Comparable<TupleWrapper> {
     private Tuple tuple;
-    private SolrStream stream;
-    private StreamComparator comp;
+    private final SolrStream stream;
+    private final StreamComparator comp;
 
     public TupleWrapper(SolrStream stream, StreamComparator comp) {
       this.stream = stream;
@@ -512,8 +496,8 @@ public class CloudSolrStream extends TupleStream implements Expressible {
 
   protected class StreamOpener implements Callable<TupleWrapper> {
 
-    private SolrStream stream;
-    private StreamComparator comp;
+    private final SolrStream stream;
+    private final StreamComparator comp;
 
     public StreamOpener(SolrStream stream, StreamComparator comp) {
       this.stream = stream;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 107d9e8..e6210ee 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -69,6 +68,7 @@ public class SolrStream extends TupleStream {
   private boolean distrib = true;
   private String user;
   private String password;
+  private String core;
 
   /**
    * @param baseUrl Base URL of the stream.
@@ -80,6 +80,11 @@ public class SolrStream extends TupleStream {
     this.params = params;
   }
 
+  SolrStream(String baseUrl, SolrParams params, String core) {
+    this(baseUrl, params);
+    this.core = core;
+  }
+
   public void setFieldMappings(Map<String, String> fieldMappings) {
     this.fieldMappings = fieldMappings;
   }
@@ -109,6 +114,8 @@ public class SolrStream extends TupleStream {
   **/
 
   public void open() throws IOException {
+
+    // Reuse the same client per node vs. having one per replica
     if(cache == null) {
       client = new HttpSolrClient.Builder(baseUrl).build();
     } else {
@@ -120,7 +127,7 @@ public class SolrStream extends TupleStream {
       if (!distrib) {
         ((ModifiableSolrParams) requestParams).add("distrib","false");
       }
-      tupleStreamParser = constructParser(client, requestParams);
+      tupleStreamParser = constructParser(requestParams);
     } catch (Exception e) {
       throw new IOException("params " + params, e);
     }
@@ -187,7 +194,7 @@ public class SolrStream extends TupleStream {
     if (closeableHttpResponse != null) {
       closeableHttpResponse.close();
     }
-    if(cache == null) {
+    if(cache == null && client != null) {
       client.close();
     }
   }
@@ -268,8 +275,7 @@ public class SolrStream extends TupleStream {
     return fields;
   }
 
-  // temporary...
-  public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+  private TupleStreamParser constructParser(SolrParams requestParams) throws IOException, SolrServerException {
     String p = requestParams.get("qt");
     if (p != null) {
       ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
@@ -280,7 +286,14 @@ public class SolrStream extends TupleStream {
 
     String wt = requestParams.get(CommonParams.WT, "json");
     QueryRequest query = new QueryRequest(requestParams);
-    query.setPath(p);
+
+    // in order to reuse HttpSolrClient objects per node, we need to cache them without the core name in the URL
+    if (core != null) {
+      query.setPath("/"+core + (p != null ? p : "/select"));
+    } else {
+      query.setPath(p);
+    }
+
     query.setResponseParser(new InputStreamResponseParser(wt));
     query.setMethod(SolrRequest.METHOD.POST);
 
@@ -288,7 +301,7 @@ public class SolrStream extends TupleStream {
       query.setBasicAuthCredentials(user, password);
     }
 
-    NamedList<Object> genericResponse = server.request(query);
+    NamedList<Object> genericResponse = client.request(query);
     InputStream stream = (InputStream) genericResponse.get("stream");
     this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse");
     if (CommonParams.JAVABIN.equals(wt)) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 9372fd6..0e48abe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -21,10 +21,13 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -122,76 +125,83 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
     return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
   }
 
+  static List<Replica> getReplicas(String zkHost,
+                                   String collection,
+                                   StreamContext streamContext,
+                                   SolrParams requestParams)
+      throws IOException {
+    List<Replica> replicas = new LinkedList<>();
+
+    //SolrCloud Sharding
+    SolrClientCache solrClientCache = (streamContext != null ? streamContext.getSolrClientCache() : null);
+    final SolrClientCache localSolrClientCache; // tracks any locally allocated cache that needs to be closed locally
+    if (solrClientCache == null) { // streamContext was null OR streamContext.getSolrClientCache() returned null
+      solrClientCache = localSolrClientCache = new SolrClientCache();
+    } else {
+      localSolrClientCache = null;
+    }
+
+    if (zkHost == null) {
+      throw new IOException(String.format(Locale.ROOT,"invalid expression - zkHost not found for collection '%s'",collection));
+    }
+
+    CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
+    ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+    ClusterState clusterState = zkStateReader.getClusterState();
+    Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
+    Set<String> liveNodes = clusterState.getLiveNodes();
+
+    RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+    final ModifiableSolrParams solrParams;
+    if (streamContext != null) {
+      solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+      requestReplicaListTransformerGenerator = streamContext.getRequestReplicaListTransformerGenerator();
+    } else {
+      solrParams = new ModifiableSolrParams();
+      requestReplicaListTransformerGenerator = null;
+    }
+    if (requestReplicaListTransformerGenerator == null) {
+      requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+    }
+    solrParams.add(requestParams);
+
+    ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
+    final String coreFilter = streamContext != null && streamContext.isLocal() ? (String)streamContext.get("core") : null;
+    List<Replica> sortedReplicas = new ArrayList<>();
+    for(Slice slice : slices) {
+      slice.getReplicas().stream().filter(r -> r.isActive(liveNodes)).forEach(sortedReplicas::add);
+      replicaListTransformer.transform(sortedReplicas);
+      sortedReplicas.stream().filter(r -> coreFilter == null || coreFilter.equals(r.core)).findFirst().ifPresent(replicas::add);
+      sortedReplicas.clear();
+    }
+
+    if (localSolrClientCache != null) {
+      localSolrClientCache.close();
+    }
+
+    return replicas;
+  }
+
   @SuppressWarnings({"unchecked"})
   public static List<String> getShards(String zkHost,
                                        String collection,
                                        StreamContext streamContext,
                                        SolrParams requestParams)
       throws IOException {
-    Map<String, List<String>> shardsMap = null;
-    List<String> shards = new ArrayList<>();
-
-    if(streamContext != null) {
-      shardsMap = (Map<String, List<String>>)streamContext.get("shards");
-    }
 
+    List<String> shards;
+    Map<String, List<String>> shardsMap = streamContext != null ? (Map<String, List<String>>)streamContext.get("shards") : null;
     if(shardsMap != null) {
       //Manual Sharding
       shards = shardsMap.get(collection);
-    } else {
-      //SolrCloud Sharding
-      SolrClientCache solrClientCache = (streamContext != null ? streamContext.getSolrClientCache() : null);
-      final SolrClientCache localSolrClientCache; // tracks any locally allocated cache that needs to be closed locally
-      if (solrClientCache == null) { // streamContext was null OR streamContext.getSolrClientCache() returned null
-        solrClientCache = localSolrClientCache = new SolrClientCache();
-      } else {
-        localSolrClientCache = null;
-      }
-      CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
-      ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
-      Set<String> liveNodes = clusterState.getLiveNodes();
-
-
-      RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
-      final ModifiableSolrParams solrParams;
-      if (streamContext != null) {
-        solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
-        requestReplicaListTransformerGenerator = streamContext.getRequestReplicaListTransformerGenerator();
-      } else {
-        solrParams = new ModifiableSolrParams();
-        requestReplicaListTransformerGenerator = null;
-      }
-      if (requestReplicaListTransformerGenerator == null) {
-        requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
-      }
-      solrParams.add(requestParams);
-
-      ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
-
-      for(Slice slice : slices) {
-        List<Replica> sortedReplicas = new ArrayList<>();
-        for(Replica replica : slice.getReplicas()) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
-            sortedReplicas.add(replica);
-          }
-        }
-
-        replicaListTransformer.transform(sortedReplicas);
-        if (sortedReplicas.size() > 0) {
-          shards.add(sortedReplicas.get(0).getCoreUrl());
-        }
-      }
-      if (localSolrClientCache != null) {
-        localSolrClientCache.close();
-      }
-    }
-    if (streamContext != null) {
-      Object core = streamContext.get("core");
-      if (streamContext.isLocal() && core != null) {
+      final Object core = streamContext.isLocal() ? streamContext.get("core") : null;
+      if (core != null) {
         shards.removeIf(shardUrl -> !shardUrl.contains((CharSequence) core));
       }
+    } else {
+      shards = getReplicas(zkHost, collection, streamContext, requestParams).stream()
+          .map(Replica::getCoreUrl).collect(Collectors.toList());
     }
 
     return shards;
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index b4e0704..a8e53cd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@@ -52,6 +54,7 @@ import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerat
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -128,6 +131,7 @@ public static void configureCluster() throws Exception {
   if (useAlias) {
     CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
   }
+  streamFactory.withCollectionZkHost(MULTI_REPLICA_COLLECTIONORALIAS, zkHost);
 }
 
 private static final String id = "id";
@@ -2723,4 +2727,60 @@ public void testParallelRankStream() throws Exception {
     }
   }
 
+  @Test
+  public void testCloudStreamClientCache() throws Exception {
+
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    solrClientCache.getCloudSolrClient(zkHost);
+    streamContext.setSolrClientCache(solrClientCache);
+
+    String expr = "search(" + MULTI_REPLICA_COLLECTIONORALIAS + ",q=*:*,fl=\"a_i\", qt=\"/export\", sort=\"a_i asc\")";
+    try (CloudSolrStream stream = new CloudSolrStream(StreamExpressionParser.parse(expr), streamFactory)) {
+      stream.setStreamContext(streamContext);
+      stream.open();
+      Tuple t = stream.read();
+      while (!t.EOF) {
+        // no-op ... just want to iterate over the tuples
+        t = stream.read();
+      }
+
+      List<String> baseUrls = new LinkedList<>();
+      ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+      List<String> resolved = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
+      Set<String> liveNodes = zkStateReader.getClusterState().getLiveNodes();
+      int expectedNumStreams = 0;
+      for (String coll : resolved) {
+        DocCollection dcoll = zkStateReader.getCollection(coll);
+        for (Slice slice : dcoll.getSlices()) {
+          ++expectedNumStreams; // one Stream per slice
+          for (Replica r : slice.getReplicas()) {
+            if (r.isActive(liveNodes)) {
+              baseUrls.add(r.getBaseUrl());
+            }
+          }
+        }
+      }
+      List<TupleStream> solrStreams = stream.children();
+      assertEquals(expectedNumStreams, solrStreams.size());
+      for (TupleStream next : solrStreams) {
+        SolrStream ss = (SolrStream)next;
+        assertTrue(baseUrls.contains(ss.getBaseUrl())); // SolrStream uses the baseUrl of the replica and not the coreUrl
+      }
+
+      // verify core filtering
+      streamContext.setLocal(true);
+
+      for (String coll : resolved) {
+        Replica rr = zkStateReader.getCollection(coll).getReplicas().get(0);
+        streamContext.put("core", rr.core);
+        List<Replica> replicas = TupleStream.getReplicas(zkHost, coll, streamContext, new ModifiableSolrParams());
+        assertEquals("core filter for " + rr.core + " not applied for " + coll + "; replicas: " + replicas, 1, replicas.size());
+        assertEquals("core filter for " + rr.core + " not applied for " + coll, rr, replicas.get(0));
+      }
+
+    } finally {
+      solrClientCache.close();
+    }
+  }
 }