You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by st...@apache.org on 2023/08/21 12:45:31 UTC

[solr] branch main updated: SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)

This is an automated email from the ASF dual-hosted git repository.

stillalex pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new f6ef54a757c SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)
f6ef54a757c is described below

commit f6ef54a757cab2e3f4ee9785e234bff0bc5a22b0
Author: Alex D <st...@apache.org>
AuthorDate: Mon Aug 21 05:45:24 2023 -0700

    SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)
---
 solr/CHANGES.txt                                   |   2 +
 solr/benchmark/build.gradle                        |   3 +-
 .../org/apache/solr/bench/MiniClusterState.java    |   3 +
 .../apache/solr/bench/search/StreamingSearch.java  | 154 +++++++++++++++++++++
 .../solr/client/solrj/io/SolrClientCache.java      | 153 ++++++++++++--------
 .../client/solrj/io/stream/CloudSolrStream.java    |   1 -
 .../solr/client/solrj/io/stream/CommitStream.java  |  12 +-
 .../client/solrj/io/stream/DeepRandomStream.java   |   2 -
 .../solr/client/solrj/io/stream/DeleteStream.java  |   3 +-
 .../solr/client/solrj/io/stream/Facet2DStream.java |  30 ++--
 .../solr/client/solrj/io/stream/FacetStream.java   |  29 ++--
 .../solrj/io/stream/FeaturesSelectionStream.java   |  63 +++++----
 .../solr/client/solrj/io/stream/HavingStream.java  |   4 +-
 .../solr/client/solrj/io/stream/KnnStream.java     |  23 ++-
 .../solr/client/solrj/io/stream/LetStream.java     |   2 +-
 .../solr/client/solrj/io/stream/ModelStream.java   |   3 -
 .../solr/client/solrj/io/stream/RandomStream.java  |  29 ++--
 .../client/solrj/io/stream/ScoreNodesStream.java   |  14 +-
 .../solr/client/solrj/io/stream/SearchStream.java  |  24 ++--
 .../solrj/io/stream/SignificantTermsStream.java    |  54 +++++---
 .../solr/client/solrj/io/stream/SolrStream.java    |  42 +++---
 .../solr/client/solrj/io/stream/SqlStream.java     |   2 -
 .../solr/client/solrj/io/stream/StatsStream.java   |  28 ++--
 .../solr/client/solrj/io/stream/StreamContext.java |   3 +-
 .../client/solrj/io/stream/TextLogitStream.java    |  44 +++---
 .../client/solrj/io/stream/TimeSeriesStream.java   |  23 ++-
 .../solr/client/solrj/io/stream/TopicStream.java   | 103 +++++++-------
 .../solr/client/solrj/io/stream/UpdateStream.java  |  41 ++----
 .../client/solrj/impl/CloudHttp2SolrClient.java    |   8 ++
 .../solr/client/solrj/impl/Http2SolrClient.java    |  25 +++-
 .../solr/client/solrj/impl/HttpSolrClient.java     |   1 +
 .../solr/client/solrj/impl/SolrClientBuilder.java  |   8 ++
 32 files changed, 596 insertions(+), 340 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c3985d65a8b..e09b1849394 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -101,6 +101,8 @@ Improvements
 
 * SOLR-16940: Users can pass Java system properties to the SolrCLI via the SOLR_TOOL_OPTS environment variable. (Houston Putman)
 
+* SOLR-16927: Allow SolrClientCache clients to use Jetty HTTP2 clients (Alex Deparvu, David Smiley)
+
 Optimizations
 ---------------------
 
diff --git a/solr/benchmark/build.gradle b/solr/benchmark/build.gradle
index be99ea3ee03..63cea7af01f 100644
--- a/solr/benchmark/build.gradle
+++ b/solr/benchmark/build.gradle
@@ -46,9 +46,10 @@ task echoCp {
 dependencies {
   implementation project(':solr:test-framework')
   implementation project(':solr:solrj')
+  implementation project(':solr:solrj-streaming')
 
   implementation 'org.apache.lucene:lucene-core'
-
+  implementation 'org.apache.httpcomponents:httpclient'
   implementation 'commons-io:commons-io'
   implementation 'io.dropwizard.metrics:metrics-core'
   implementation 'org.apache.commons:commons-math3'
diff --git a/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java b/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
index a35dd07bbcc..dd1c0afe639 100755
--- a/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
+++ b/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
@@ -86,6 +86,8 @@ public class MiniClusterState {
     /** The Nodes. */
     public List<String> nodes;
 
+    public String zkHost;
+
     /** The Cluster. */
     MiniSolrCloudCluster cluster;
 
@@ -277,6 +279,7 @@ public class MiniClusterState {
       for (JettySolrRunner runner : jetties) {
         nodes.add(runner.getBaseUrl().toString());
       }
+      zkHost = cluster.getZkServer().getZkAddress();
 
       client = new Http2SolrClient.Builder().useHttp1_1(useHttp1).build();
 
diff --git a/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java b/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java
new file mode 100644
index 00000000000..14046644c46
--- /dev/null
+++ b/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.bench.search;
+
+import static org.apache.solr.bench.Docs.docs;
+import static org.apache.solr.bench.generators.SourceDSL.integers;
+import static org.apache.solr.bench.generators.SourceDSL.strings;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.solr.bench.Docs;
+import org.apache.solr.bench.MiniClusterState;
+import org.apache.solr.bench.MiniClusterState.MiniClusterBenchState;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Fork(value = 1)
+@BenchmarkMode(Mode.Throughput)
+@Warmup(time = 5, iterations = 1)
+@Measurement(time = 30, iterations = 4)
+@Threads(value = 1)
+public class StreamingSearch {
+
+  private static final String collection = "benchStreamingSearch";
+
+  @State(Scope.Benchmark)
+  public static class BenchState {
+
+    @Param({"false", "true"})
+    boolean useHttp1;
+
+    private int docs = 1000;
+    private String zkHost;
+    private ModifiableSolrParams params;
+    private StreamContext streamContext;
+    private Http2SolrClient http2SolrClient;
+
+    @Setup(Level.Trial)
+    public void setup(MiniClusterBenchState miniClusterState) throws Exception {
+
+      miniClusterState.startMiniCluster(3);
+      miniClusterState.createCollection(collection, 3, 1);
+      Docs docGen =
+          docs()
+              .field("id", integers().incrementing())
+              .field("text2_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
+              .field("text3_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
+              .field("int1_i_dv", integers().all());
+      miniClusterState.index(collection, docGen, docs);
+      miniClusterState.waitForMerges(collection);
+
+      zkHost = miniClusterState.zkHost;
+
+      params = new ModifiableSolrParams();
+      params.set(CommonParams.Q, "*:*");
+      params.set(CommonParams.FL, "id,text2_ts,text3_ts,int1_i_dv");
+      params.set(CommonParams.SORT, "id asc,int1_i_dv asc");
+      params.set(CommonParams.ROWS, docs);
+    }
+
+    @Setup(Level.Iteration)
+    public void setupIteration(MiniClusterState.MiniClusterBenchState miniClusterState)
+        throws SolrServerException, IOException {
+      SolrClientCache solrClientCache;
+      if (useHttp1) {
+        var httpClient = HttpClientUtil.createClient(null); // TODO tune params?
+        solrClientCache = new SolrClientCache(httpClient);
+      } else {
+        http2SolrClient = newHttp2SolrClient();
+        solrClientCache = new SolrClientCache(http2SolrClient);
+      }
+
+      streamContext = new StreamContext();
+      streamContext.setSolrClientCache(solrClientCache);
+    }
+
+    @TearDown(Level.Iteration)
+    public void teardownIt() {
+      streamContext.getSolrClientCache().close();
+      if (http2SolrClient != null) {
+        http2SolrClient.close();
+      }
+    }
+  }
+
+  @Benchmark
+  public Object stream(
+      BenchState benchState, MiniClusterState.MiniClusterBenchState miniClusterState)
+      throws SolrServerException, IOException {
+    CloudSolrStream stream = new CloudSolrStream(benchState.zkHost, collection, benchState.params);
+    stream.setStreamContext(benchState.streamContext);
+    return getTuples(stream);
+  }
+
+  private static List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+    List<Tuple> tuples = new ArrayList<>();
+    try {
+      tupleStream.open();
+      while (true) {
+        Tuple t = tupleStream.read();
+        if (t.EOF) {
+          break;
+        } else {
+          tuples.add(t);
+        }
+      }
+      return tuples;
+    } finally {
+      tupleStream.close();
+    }
+  }
+
+  public static Http2SolrClient newHttp2SolrClient() {
+    // TODO tune params?
+    var builder = new Http2SolrClient.Builder();
+    return builder.build();
+  }
+}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index ff2963c89fb..02621784d00 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -16,10 +16,9 @@
  */
 package org.apache.solr.client.solrj.io;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,97 +27,137 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.SolrClientBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * The SolrClientCache caches SolrClients so they can be reused by different TupleStreams.
- *
- * <p>TODO: Cut this over to using Solr's new Http2 clients
- */
-public class SolrClientCache implements Serializable {
+/** The SolrClientCache caches SolrClients so they can be reused by different TupleStreams. */
+public class SolrClientCache implements Closeable {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Map<String, SolrClient> solrClients = new HashMap<>();
-  private final HttpClient httpClient;
   // Set the floor for timeouts to 60 seconds.
   // Timeouts cans be increased by setting the system properties defined below.
-  private static final int conTimeout =
+  private static final int MIN_TIMEOUT = 60000;
+  private static final int minConnTimeout =
       Math.max(
-          Integer.parseInt(System.getProperty(HttpClientUtil.PROP_CONNECTION_TIMEOUT, "60000")),
-          60000);
-  private static final int socketTimeout =
-      Math.max(
-          Integer.parseInt(System.getProperty(HttpClientUtil.PROP_SO_TIMEOUT, "60000")), 60000);
+          Integer.getInteger(HttpClientUtil.PROP_CONNECTION_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);
+  private static final int minSocketTimeout =
+      Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);
+
+  private final Map<String, SolrClient> solrClients = new HashMap<>();
+  private final HttpClient apacheHttpClient;
+  private final Http2SolrClient http2SolrClient;
 
   public SolrClientCache() {
-    httpClient = null;
+    this.apacheHttpClient = null;
+    this.http2SolrClient = null;
   }
 
   @Deprecated(since = "9.0")
-  public SolrClientCache(HttpClient httpClient) {
-    this.httpClient = httpClient;
+  public SolrClientCache(HttpClient apacheHttpClient) {
+    this.apacheHttpClient = apacheHttpClient;
+    this.http2SolrClient = null;
   }
 
-  @Deprecated(since = "9.0")
-  public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
-
-    // Timeouts should never be lower then 60000 but they can be set higher
-    assert (conTimeout >= 60000);
-    assert (socketTimeout >= 60000);
-
-    if (log.isDebugEnabled()) {
-      log.debug("SolrClientCache.conTimeout: {}", conTimeout);
-      log.debug("SolrClientCache.socketTimeout: {}", socketTimeout);
-    }
+  public SolrClientCache(Http2SolrClient http2SolrClient) {
+    this.apacheHttpClient = null;
+    this.http2SolrClient = http2SolrClient;
+  }
 
+  public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
     Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
-    CloudSolrClient client;
     if (solrClients.containsKey(zkHost)) {
-      client = (CloudSolrClient) solrClients.get(zkHost);
+      return (CloudSolrClient) solrClients.get(zkHost);
+    }
+    final CloudSolrClient client;
+    if (apacheHttpClient != null) {
+      client = newCloudLegacySolrClient(zkHost, apacheHttpClient);
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      var builder =
-          new CloudLegacySolrClient.Builder(hosts, Optional.empty())
-              .withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
-              .withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
-      if (httpClient != null) {
-        builder = builder.withHttpClient(httpClient);
-      }
-
-      client = builder.build();
-      client.connect();
-      solrClients.put(zkHost, client);
+      client = newCloudHttp2SolrClient(zkHost, http2SolrClient);
     }
+    solrClients.put(zkHost, client);
+    return client;
+  }
 
+  @Deprecated
+  private static CloudSolrClient newCloudLegacySolrClient(String zkHost, HttpClient httpClient) {
+    final List<String> hosts = List.of(zkHost);
+    var builder = new CloudLegacySolrClient.Builder(hosts, Optional.empty());
+    adjustTimeouts(builder, httpClient);
+    var client = builder.build();
+    client.connect();
+    return client;
+  }
+
+  private static CloudHttp2SolrClient newCloudHttp2SolrClient(
+      String zkHost, Http2SolrClient http2SolrClient) {
+    final List<String> hosts = List.of(zkHost);
+    var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
+    // using internal builder to ensure the internal client gets closed
+    builder = builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, http2SolrClient));
+    var client = builder.build();
+    client.connect();
     return client;
   }
 
-  @Deprecated(since = "9.0")
   public synchronized SolrClient getHttpSolrClient(String baseUrl) {
-    SolrClient client;
+    Objects.requireNonNull(baseUrl, "Url cannot be null!");
     if (solrClients.containsKey(baseUrl)) {
-      client = solrClients.get(baseUrl);
+      return solrClients.get(baseUrl);
+    }
+    final SolrClient client;
+    if (apacheHttpClient != null) {
+      client = newHttpSolrClient(baseUrl, apacheHttpClient);
     } else {
-      HttpSolrClient.Builder builder =
-          new HttpSolrClient.Builder(baseUrl)
-              .withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
-              .withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
-      if (httpClient != null) {
-        builder = builder.withHttpClient(httpClient);
-      }
-      client = builder.build();
-      solrClients.put(baseUrl, client);
+      client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build();
     }
+    solrClients.put(baseUrl, client);
     return client;
   }
 
+  @Deprecated
+  private static SolrClient newHttpSolrClient(String baseUrl, HttpClient httpClient) {
+    HttpSolrClient.Builder builder = new HttpSolrClient.Builder(baseUrl);
+    adjustTimeouts(builder, httpClient);
+    return builder.build();
+  }
+
+  @Deprecated
+  private static void adjustTimeouts(SolrClientBuilder<?> builder, HttpClient httpClient) {
+    builder.withHttpClient(httpClient);
+    int socketTimeout = Math.max(minSocketTimeout, builder.getSocketTimeoutMillis());
+    builder.withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS);
+    int connTimeout = Math.max(minConnTimeout, builder.getConnectionTimeoutMillis());
+    builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
+  }
+
+  private static Http2SolrClient.Builder newHttp2SolrClientBuilder(
+      String baseUrl, Http2SolrClient http2SolrClient) {
+    var builder = new Http2SolrClient.Builder(baseUrl);
+    if (http2SolrClient != null) {
+      builder = builder.withHttpClient(http2SolrClient);
+    }
+    long idleTimeout = minSocketTimeout;
+    if (builder.getIdleTimeoutMillis() != null) {
+      idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis());
+    }
+    builder.withIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
+    long connTimeout = minConnTimeout;
+    if (builder.getConnectionTimeout() != null) {
+      connTimeout = Math.max(idleTimeout, builder.getConnectionTimeout());
+    }
+    builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
+    return builder;
+  }
+
+  @Override
   public synchronized void close() {
     for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
       try {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 6cef1b2c9b6..8b2bd54aef7 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -75,7 +75,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   protected StreamComparator comp;
   private boolean trace;
   protected transient Map<String, Tuple> eofTuples;
-  protected transient CloudSolrClient cloudSolrClient;
   protected transient List<TupleStream> solrStreams;
   protected transient TreeSet<TupleWrapper> tuples;
   protected transient StreamContext streamContext;
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
index 1121ab12421..79f8abfab23 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
@@ -55,6 +55,7 @@ public class CommitStream extends TupleStream implements Expressible {
   private TupleStream tupleSource;
 
   private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
   private long docsSinceCommit;
 
   public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -150,7 +151,12 @@ public class CommitStream extends TupleStream implements Expressible {
   @Override
   public void open() throws IOException {
     tupleSource.open();
-    clientCache = new SolrClientCache();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
     docsSinceCommit = 0;
   }
 
@@ -193,7 +199,9 @@ public class CommitStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    clientCache.close();
+    if (doCloseCache) {
+      clientCache.close();
+    }
     tupleSource.close();
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index 02bb701d5d3..2e32dc26f09 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -71,7 +70,6 @@ public class DeepRandomStream extends TupleStream implements Expressible {
   protected StreamComparator comp;
   private boolean trace;
   protected transient Map<String, Tuple> eofTuples;
-  protected transient CloudSolrClient cloudSolrClient;
   protected transient List<TupleStream> solrStreams;
   protected transient Deque<TupleWrapper> tuples;
   protected transient StreamContext streamContext;
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
index e78941c80a8..d00908792ed 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
@@ -79,7 +79,8 @@ public final class DeleteStream extends UpdateStream implements Expressible {
         final Long version = getVersion(doc);
         req.deleteById(id, version);
       }
-      req.process(getCloudSolrClient(), getCollectionName());
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
+      req.process(cloudSolrClient, getCollectionName());
     } catch (SolrServerException | NumberFormatException | IOException e) {
       log.warn("Unable to delete documents from collection due to unexpected error.", e);
       String className = e.getClass().getName();
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
index c362257387c..878dee75af4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
@@ -24,12 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -68,8 +64,8 @@ public class Facet2DStream extends TupleStream implements Expressible {
   private int dimensionY;
   private FieldComparator bucketSort;
 
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
+  protected transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
 
   public Facet2DStream(
       String zkHost,
@@ -312,7 +308,7 @@ public class Facet2DStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    cache = context.getSolrClientCache();
+    clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -322,16 +318,11 @@ public class Facet2DStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    if (cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient =
-          new CloudLegacySolrClient.Builder(hosts, Optional.empty())
-              .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
-              .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
-              .build();
+      doCloseCache = false;
     }
     FieldComparator[] adjustedSorts = adjustSorts(x, y, bucketSort);
 
@@ -344,6 +335,7 @@ public class Facet2DStream extends TupleStream implements Expressible {
 
     QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       NamedList<Object> response = cloudSolrClient.request(request, collection);
       getTuples(response, x, y, metric);
       this.out = tuples.iterator();
@@ -366,10 +358,8 @@ public class Facet2DStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (cache == null) {
-      if (cloudSolrClient != null) {
-        cloudSolrClient.close();
-      }
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 3b0b785cbc9..759c038d7a0 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -24,11 +24,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -88,8 +85,8 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
   private boolean resortNeeded;
   private boolean serializeBucketSizeLimit;
 
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
+  protected transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
   protected transient TupleStream parallelizedStream;
   protected transient StreamContext context;
 
@@ -647,7 +644,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
   @Override
   public void setStreamContext(StreamContext context) {
     this.context = context;
-    cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -657,17 +654,13 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
 
   @Override
   public void open() throws IOException {
-    if (cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient =
-          new CloudLegacySolrClient.Builder(hosts, Optional.empty())
-              .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
-              .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
-              .build();
+      doCloseCache = false;
     }
+    var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
 
     // Parallelize the facet expression across multiple collections for an alias using plist if
     // possible
@@ -761,10 +754,8 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
 
   @Override
   public void close() throws IOException {
-    if (cache == null) {
-      if (cloudSolrClient != null) {
-        cloudSolrClient.close();
-      }
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index d4990800cd4..3a761ceaee4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -76,12 +75,9 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
   protected int positiveLabel;
   protected int numTerms;
 
-  protected transient SolrClientCache cache;
-  protected transient boolean isCloseCache;
-  protected transient CloudSolrClient cloudSolrClient;
-
-  protected transient StreamContext streamContext;
-  protected ExecutorService executorService;
+  protected transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+  protected transient ExecutorService executorService;
 
   public FeaturesSelectionStream(
       String zkHost,
@@ -254,21 +250,19 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
 
   @Override
   public void setStreamContext(StreamContext context) {
-    this.cache = context.getSolrClientCache();
-    this.streamContext = context;
+    this.clientCache = context.getSolrClientCache();
   }
 
   /** Opens the CloudSolrStream */
   @Override
   public void open() throws IOException {
-    if (cache == null) {
-      isCloseCache = true;
-      cache = new SolrClientCache();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      isCloseCache = false;
+      doCloseCache = false;
     }
 
-    this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
     this.executorService =
         ExecutorUtil.newMDCAwareCachedThreadPool(
             new SolrNamedThreadFactory("FeaturesSelectionStream"));
@@ -281,6 +275,7 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
 
   private List<String> getShardUrls() throws IOException {
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
       Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
 
@@ -314,7 +309,14 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
     List<Future<NamedList<?>>> futures = new ArrayList<>();
     for (String baseUrl : baseUrls) {
       FeaturesSelectionCall lc =
-          new FeaturesSelectionCall(baseUrl, this.params, this.field, this.outcome);
+          new FeaturesSelectionCall(
+              baseUrl,
+              this.params,
+              this.field,
+              this.outcome,
+              this.positiveLabel,
+              this.numTerms,
+              this.clientCache);
 
       Future<NamedList<?>> future = executorService.submit(lc);
       futures.add(future);
@@ -325,8 +327,8 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
 
   @Override
   public void close() throws IOException {
-    if (isCloseCache && cache != null) {
-      cache.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
 
     if (executorService != null) {
@@ -417,26 +419,37 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
     return result;
   }
 
-  protected class FeaturesSelectionCall implements Callable<NamedList<?>> {
+  protected static class FeaturesSelectionCall implements Callable<NamedList<?>> {
 
-    private String baseUrl;
-    private String outcome;
-    private String field;
-    private Map<String, String> paramsMap;
+    private final String baseUrl;
+    private final String outcome;
+    private final String field;
+    private final Map<String, String> paramsMap;
+    private final int positiveLabel;
+    private final int numTerms;
+    private final SolrClientCache clientCache;
 
     public FeaturesSelectionCall(
-        String baseUrl, Map<String, String> paramsMap, String field, String outcome) {
-
+        String baseUrl,
+        Map<String, String> paramsMap,
+        String field,
+        String outcome,
+        int positiveLabel,
+        int numTerms,
+        SolrClientCache clientCache) {
       this.baseUrl = baseUrl;
       this.outcome = outcome;
       this.field = field;
       this.paramsMap = paramsMap;
+      this.positiveLabel = positiveLabel;
+      this.numTerms = numTerms;
+      this.clientCache = clientCache;
     }
 
     @Override
     public NamedList<?> call() throws Exception {
       ModifiableSolrParams params = new ModifiableSolrParams();
-      SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+      SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
 
       params.add(DISTRIB, "false");
       params.add("fq", "{!igain}");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
index e151aaa19f7..988a3dbc73e 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
@@ -44,9 +44,7 @@ public class HavingStream extends TupleStream implements Expressible {
 
   private TupleStream stream;
   private RecursiveBooleanEvaluator evaluator;
-  private StreamContext streamContext;
-
-  private transient Tuple currentGroupHead;
+  private transient StreamContext streamContext;
 
   public HavingStream(TupleStream stream, RecursiveBooleanEvaluator evaluator) throws IOException {
     init(stream, evaluator);
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
index fb1d2ba6e63..41cd08d18bf 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
@@ -29,7 +29,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -60,8 +59,8 @@ public class KnnStream extends TupleStream implements Expressible {
   private String zkHost;
   private Map<String, String> props;
   private String collection;
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
   private Iterator<SolrDocument> documentIterator;
   private String id;
 
@@ -193,7 +192,7 @@ public class KnnStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    cache = context.getSolrClientCache();
+    clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -204,7 +203,13 @@ public class KnnStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
+
     ModifiableSolrParams params = getParams(this.props);
 
     StringBuilder builder = new StringBuilder();
@@ -227,7 +232,7 @@ public class KnnStream extends TupleStream implements Expressible {
 
     QueryRequest request = new QueryRequest(params);
     try {
-      QueryResponse response = request.process(cloudSolrClient, collection);
+      QueryResponse response = request.process(clientCache.getCloudSolrClient(zkHost), collection);
       SolrDocumentList docs = response.getResults();
       documentIterator = docs.iterator();
     } catch (Exception e) {
@@ -236,7 +241,11 @@ public class KnnStream extends TupleStream implements Expressible {
   }
 
   @Override
-  public void close() throws IOException {}
+  public void close() throws IOException {
+    if (doCloseCache) {
+      clientCache.close();
+    }
+  }
 
   @Override
   public Tuple read() throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
index e5a6c4113ab..9576cf9658e 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
@@ -44,7 +44,7 @@ public class LetStream extends TupleStream implements Expressible {
 
   private static final long serialVersionUID = 1;
   private TupleStream stream;
-  private StreamContext streamContext;
+  private transient StreamContext streamContext;
   private Map<String, Object> letParams = new LinkedHashMap<>();
 
   public LetStream(StreamExpression expression, StreamFactory factory) throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
index 727df5560a5..096f4a55e9c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import org.apache.solr.client.solrj.io.ModelCache;
-import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -53,7 +52,6 @@ public class ModelStream extends TupleStream implements Expressible {
   protected String collection;
   protected String modelID;
   protected ModelCache modelCache;
-  protected SolrClientCache solrClientCache;
   protected Tuple model;
   protected long cacheMillis;
 
@@ -162,7 +160,6 @@ public class ModelStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    this.solrClientCache = context.getSolrClientCache();
     this.modelCache = context.getModelCache();
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index aa0087bf8b0..c802fd870ee 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -27,13 +27,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -64,12 +60,13 @@ public class RandomStream extends TupleStream implements Expressible {
   private String zkHost;
   private Map<String, String> props;
   private String collection;
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
   private Iterator<SolrDocument> documentIterator;
   private int x;
   private boolean outputX;
 
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   public RandomStream() {
     // Used by the RandomFacade
   }
@@ -200,7 +197,7 @@ public class RandomStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    cache = context.getSolrClientCache();
+    clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -211,16 +208,11 @@ public class RandomStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    if (cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient =
-          new CloudLegacySolrClient.Builder(hosts, Optional.empty())
-              .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
-              .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
-              .build();
+      doCloseCache = false;
     }
 
     ModifiableSolrParams params = getParams(this.props);
@@ -235,6 +227,7 @@ public class RandomStream extends TupleStream implements Expressible {
 
     QueryRequest request = new QueryRequest(params, SolrRequest.METHOD.POST);
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       QueryResponse response = request.process(cloudSolrClient, collection);
       SolrDocumentList docs = response.getResults();
       documentIterator = docs.iterator();
@@ -245,8 +238,8 @@ public class RandomStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (cache == null) {
-      cloudSolrClient.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
index ea10185f730..01be8e309c4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
@@ -59,7 +59,6 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
 
   protected String zkHost;
   private TupleStream stream;
-  private transient SolrClientCache clientCache;
   private Map<String, Tuple> nodes = new HashMap<>();
   private Iterator<Tuple> tuples;
   private String termFreq;
@@ -68,6 +67,9 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
   private String bucket;
   private String facetCollection;
 
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   public ScoreNodesStream(TupleStream tupleStream, String nodeFreqField) throws IOException {
     init(tupleStream, nodeFreqField);
   }
@@ -176,6 +178,13 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
+
     stream.open();
     Tuple node = null;
     StringBuilder builder = new StringBuilder();
@@ -261,6 +270,9 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
+    if (doCloseCache) {
+      clientCache.close();
+    }
     stream.close();
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
index 1e6226b7536..f1e6c33c2fb 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -24,10 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map.Entry;
-import java.util.Optional;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -55,11 +52,12 @@ public class SearchStream extends TupleStream implements Expressible {
   private String zkHost;
   private ModifiableSolrParams params;
   private String collection;
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
   private Iterator<SolrDocument> documentIterator;
   protected StreamComparator comp;
 
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   public SearchStream() {}
 
   public SearchStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -184,7 +182,7 @@ public class SearchStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    cache = context.getSolrClientCache();
+    clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -195,16 +193,16 @@ public class SearchStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    if (cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
+      doCloseCache = false;
     }
 
     QueryRequest request = new QueryRequest(params, SolrRequest.METHOD.POST);
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       QueryResponse response = request.process(cloudSolrClient, collection);
       SolrDocumentList docs = response.getResults();
       documentIterator = docs.iterator();
@@ -215,8 +213,8 @@ public class SearchStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (cache == null) {
-      cloudSolrClient.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index 7e47003fa63..22d7c8bc73c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -68,10 +68,10 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
   protected float maxDocFreq;
   protected int minTermLength;
 
-  protected transient SolrClientCache cache;
-  protected transient boolean isCloseCache;
-  protected transient StreamContext streamContext;
-  protected ExecutorService executorService;
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+  private transient StreamContext streamContext;
+  private transient ExecutorService executorService;
 
   public SignificantTermsStream(
       String zkHost,
@@ -239,17 +239,17 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    this.cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
     this.streamContext = context;
   }
 
   @Override
   public void open() throws IOException {
-    if (cache == null) {
-      isCloseCache = true;
-      cache = new SolrClientCache();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      isCloseCache = false;
+      doCloseCache = false;
     }
 
     this.executorService =
@@ -274,7 +274,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
               this.minDocFreq,
               this.maxDocFreq,
               this.minTermLength,
-              this.numTerms);
+              this.numTerms,
+              streamContext.isLocal(),
+              clientCache);
 
       Future<NamedList<?>> future = executorService.submit(lc);
       futures.add(future);
@@ -285,8 +287,8 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (isCloseCache) {
-      cache.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
 
     executorService.shutdown();
@@ -387,15 +389,17 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
     }
   }
 
-  protected class SignificantTermsCall implements Callable<NamedList<?>> {
+  protected static class SignificantTermsCall implements Callable<NamedList<?>> {
 
-    private String baseUrl;
-    private String field;
-    private float minDocFreq;
-    private float maxDocFreq;
-    private int numTerms;
-    private int minTermLength;
-    private Map<String, String> paramsMap;
+    private final String baseUrl;
+    private final String field;
+    private final float minDocFreq;
+    private final float maxDocFreq;
+    private final int numTerms;
+    private final int minTermLength;
+    private final Map<String, String> paramsMap;
+    private final boolean isLocal;
+    private final SolrClientCache clientCache;
 
     public SignificantTermsCall(
         String baseUrl,
@@ -404,7 +408,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
         float minDocFreq,
         float maxDocFreq,
         int minTermLength,
-        int numTerms) {
+        int numTerms,
+        boolean isLocal,
+        SolrClientCache clientCache) {
 
       this.baseUrl = baseUrl;
       this.field = field;
@@ -413,12 +419,14 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
       this.paramsMap = paramsMap;
       this.numTerms = numTerms;
       this.minTermLength = minTermLength;
+      this.isLocal = isLocal;
+      this.clientCache = clientCache;
     }
 
     @Override
     public NamedList<?> call() throws Exception {
       ModifiableSolrParams params = new ModifiableSolrParams();
-      SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+      SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
 
       params.add(DISTRIB, "false");
       params.add("fq", "{!significantTerms}");
@@ -432,7 +440,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
       params.add("minTermLength", Integer.toString(minTermLength));
       params.add("field", field);
       params.add("numTerms", String.valueOf(numTerms * 5));
-      if (streamContext.isLocal()) {
+      if (isLocal) {
         params.add("distrib", "false");
       }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 8f019c36e61..f58546cb432 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -25,10 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -47,8 +46,6 @@ import org.apache.solr.common.util.NamedList;
 /**
  * Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
  *
- * <p>TODO: Move this to Http2SolrClient
- *
  * @since 5.1.0
  */
 public class SolrStream extends TupleStream {
@@ -62,16 +59,17 @@ public class SolrStream extends TupleStream {
   private boolean trace;
   private Map<String, String> fieldMappings;
   private transient TupleStreamParser tupleStreamParser;
-  private transient SolrClient client;
-  private transient SolrClientCache cache;
   private String slice;
   private long checkpoint = -1;
-  private CloseableHttpResponse closeableHttpResponse;
+  private Closeable closeableHttpResponse;
   private boolean distrib = true;
   private String user;
   private String password;
   private String core;
 
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   /**
    * @param baseUrl Base URL of the stream.
    * @param params Map&lt;String, String&gt; of parameters
@@ -104,7 +102,7 @@ public class SolrStream extends TupleStream {
     this.distrib = !context.isLocal();
     this.numWorkers = context.numWorkers;
     this.workerID = context.workerID;
-    this.cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
   }
 
   public void setCredentials(String user, String password) {
@@ -115,12 +113,11 @@ public class SolrStream extends TupleStream {
   /** Opens the stream to a single Solr instance. */
   @Override
   public void open() throws IOException {
-
-    // Reuse the same client per node vs. having one per replica
-    if (cache == null) {
-      client = new HttpSolrClient.Builder(baseUrl).build();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      client = cache.getHttpSolrClient(baseUrl);
+      doCloseCache = false;
     }
 
     try {
@@ -193,8 +190,8 @@ public class SolrStream extends TupleStream {
     if (closeableHttpResponse != null) {
       closeableHttpResponse.close();
     }
-    if (cache == null && client != null) {
-      client.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
@@ -303,15 +300,26 @@ public class SolrStream extends TupleStream {
       query.setBasicAuthCredentials(user, password);
     }
 
+    var client = clientCache.getHttpSolrClient(baseUrl);
     NamedList<Object> genericResponse = client.request(query);
     InputStream stream = (InputStream) genericResponse.get("stream");
+
     CloseableHttpResponse httpResponse =
         (CloseableHttpResponse) genericResponse.get("closeableResponse");
+    // still attempting to read http status from http response for backwards compatibility reasons
+    // since 9.4 the updated format will have a dedicated status field
+    final int statusCode;
+    if (httpResponse != null) {
+      statusCode = httpResponse.getStatusLine().getStatusCode();
+    } else {
+      statusCode = (int) genericResponse.get("responseStatus");
+    }
 
-    final int statusCode = httpResponse.getStatusLine().getStatusCode();
     if (statusCode != 200) {
       String errMsg = consumeStreamAsErrorMessage(stream);
-      httpResponse.close();
+      if (httpResponse != null) {
+        httpResponse.close();
+      }
       throw new IOException(
           "Query to '"
               + query.getPath()
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
index a0419567d1d..b39ab1dbdc1 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
@@ -24,7 +24,6 @@ import java.util.Locale;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -49,7 +48,6 @@ public class SqlStream extends TupleStream implements Expressible {
   protected String zkHost;
   protected String collection;
   protected SolrParams params;
-  protected transient CloudSolrClient cloudSolrClient;
   protected transient TupleStream tupleStream;
   protected transient StreamContext streamContext;
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index fb7696c8ac9..507dc3820fd 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -30,7 +30,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -69,9 +68,10 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
   private String zkHost;
   private SolrParams params;
   private String collection;
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
-  private StreamContext context;
+
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+  private transient StreamContext context;
   protected transient TupleStream parallelizedStream;
 
   public StatsStream(String zkHost, String collection, SolrParams params, Metric[] metrics)
@@ -216,7 +216,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
   @Override
   public void setStreamContext(StreamContext context) {
     this.context = context;
-    cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -226,6 +226,12 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
 
   @Override
   public void open() throws IOException {
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
 
     @SuppressWarnings({"unchecked"})
     Map<String, List<String>> shardsMap = (Map<String, List<String>>) context.get("shards");
@@ -233,7 +239,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
     // Parallelize the stats stream across multiple collections for an alias using plist if possible
     if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
       ClusterStateProvider clusterStateProvider =
-          cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+          clientCache.getCloudSolrClient(zkHost).getClusterStateProvider();
       final List<String> resolved =
           clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
       if (resolved != null && resolved.size() > 1) {
@@ -255,7 +261,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
 
     if (shardsMap == null) {
       QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       try {
         NamedList<?> response = cloudSolrClient.request(request, collection);
         getTuples(response, metrics);
@@ -264,7 +270,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
       }
     } else {
       List<String> shards = shardsMap.get(collection);
-      SolrClient client = cache.getHttpSolrClient(shards.get(0));
+      SolrClient client = clientCache.getHttpSolrClient(shards.get(0));
 
       if (shards.size() > 1) {
         String shardsParam = getShardString(shards);
@@ -294,7 +300,11 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
   }
 
   @Override
-  public void close() throws IOException {}
+  public void close() throws IOException {
+    if (doCloseCache) {
+      clientCache.close();
+    }
+  }
 
   @Override
   public Tuple read() throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 5feb15e46fa..2301b3d2136 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -33,7 +32,7 @@ import org.apache.solr.common.params.SolrParams;
  * <p>Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for
  * reuse across multiple TupleStreams.
  */
-public class StreamContext implements Serializable {
+public class StreamContext {
 
   private Map<String, Object> entries = new HashMap<>();
   private Map<String, String> tupleContext = new HashMap<>();
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index 1f746ef35f8..e3ff627232a 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.ClassificationEvaluation;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -82,12 +81,11 @@ public class TextLogitStream extends TupleStream implements Expressible {
   protected List<Double> idfs;
   protected ClassificationEvaluation evaluation;
 
-  protected transient SolrClientCache cache;
-  protected transient boolean isCloseCache;
-  protected transient CloudSolrClient cloudSolrClient;
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
 
   protected transient StreamContext streamContext;
-  protected ExecutorService executorService;
+  protected transient ExecutorService executorService;
   protected TupleStream termsStream;
   private List<String> terms;
 
@@ -355,7 +353,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    this.cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
     this.streamContext = context;
     this.termsStream.setStreamContext(context);
   }
@@ -363,14 +361,13 @@ public class TextLogitStream extends TupleStream implements Expressible {
   /** Opens the CloudSolrStream */
   @Override
   public void open() throws IOException {
-    if (cache == null) {
-      isCloseCache = true;
-      cache = new SolrClientCache();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      isCloseCache = false;
+      doCloseCache = false;
     }
 
-    this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
     this.executorService =
         ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TextLogitSolrStream"));
   }
@@ -384,6 +381,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
 
   protected List<String> getShardUrls() throws IOException {
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
 
       Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -426,7 +424,10 @@ public class TextLogitStream extends TupleStream implements Expressible {
               this.outcome,
               this.positiveLabel,
               this.learningRate,
-              this.iteration);
+              this.iteration,
+              this.threshold,
+              this.idfs,
+              this.clientCache);
 
       Future<Tuple> future = executorService.submit(lc);
       futures.add(future);
@@ -437,8 +438,8 @@ public class TextLogitStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (isCloseCache && cache != null) {
-      cache.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
 
     if (executorService != null) {
@@ -642,7 +643,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
     }
   }
 
-  protected class LogitCall implements Callable<Tuple> {
+  protected static class LogitCall implements Callable<Tuple> {
 
     private String baseUrl;
     private String feature;
@@ -653,6 +654,9 @@ public class TextLogitStream extends TupleStream implements Expressible {
     private int positiveLabel;
     private double learningRate;
     private Map<String, String> paramsMap;
+    private double threshold;
+    private List<Double> idfs;
+    private SolrClientCache clientCache;
 
     public LogitCall(
         String baseUrl,
@@ -663,7 +667,10 @@ public class TextLogitStream extends TupleStream implements Expressible {
         String outcome,
         int positiveLabel,
         double learningRate,
-        int iteration) {
+        int iteration,
+        double threshold,
+        List<Double> idfs,
+        SolrClientCache clientCache) {
 
       this.baseUrl = baseUrl;
       this.feature = feature;
@@ -674,12 +681,15 @@ public class TextLogitStream extends TupleStream implements Expressible {
       this.positiveLabel = positiveLabel;
       this.learningRate = learningRate;
       this.paramsMap = paramsMap;
+      this.threshold = threshold;
+      this.idfs = idfs;
+      this.clientCache = clientCache;
     }
 
     @Override
     public Tuple call() throws Exception {
       ModifiableSolrParams params = new ModifiableSolrParams();
-      SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+      SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
 
       params.add(DISTRIB, "false");
       params.add("fq", "{!tlogit}");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index 683cc35119c..383389d2551 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -25,11 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -71,8 +68,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
   private String zkHost;
   private SolrParams params;
   private String collection;
-  protected transient SolrClientCache cache;
-  protected transient CloudSolrClient cloudSolrClient;
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
 
   public TimeSeriesStream(
       String zkHost,
@@ -338,7 +335,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    cache = context.getSolrClientCache();
+    clientCache = context.getSolrClientCache();
   }
 
   @Override
@@ -348,12 +345,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    if (cache != null) {
-      cloudSolrClient = cache.getCloudSolrClient(zkHost);
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
     } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
+      doCloseCache = false;
     }
 
     String json = getJsonFacetString(field, metrics, start, end, gap);
@@ -364,6 +360,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
 
     QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       NamedList<?> response = cloudSolrClient.request(request, collection);
       getTuples(response, field, metrics);
     } catch (Exception e) {
@@ -373,8 +370,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (cache == null) {
-      cloudSolrClient.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 54dc364fcf3..e343d56e35c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
@@ -40,7 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -79,6 +78,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
   private String checkpointCollection;
   private long initialCheckpoint = -1;
 
+  private transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   public TopicStream(
       String zkHost,
       String checkpointCollection,
@@ -310,6 +312,12 @@ public class TopicStream extends CloudSolrStream implements Expressible {
 
   @Override
   public void open() throws IOException {
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
     this.tuples = new TreeSet<>();
     this.solrStreams = new ArrayList<>();
     this.eofTuples = Collections.synchronizedMap(new HashMap<>());
@@ -318,16 +326,6 @@ public class TopicStream extends CloudSolrStream implements Expressible {
       // Each worker must maintain its own checkpoints
       this.id = this.id + "_" + streamContext.workerID;
     }
-
-    if (streamContext.getSolrClientCache() != null) {
-      cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
-    } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
-      this.cloudSolrClient.connect();
-    }
-
     if (checkpoints.size() == 0) {
       getPersistedCheckpoints();
       if (checkpoints.size() == 0) {
@@ -384,8 +382,8 @@ public class TopicStream extends CloudSolrStream implements Expressible {
         }
       }
 
-      if (streamContext != null && streamContext.getSolrClientCache() == null) {
-        cloudSolrClient.close();
+      if (doCloseCache) {
+        clientCache.close();
       }
     }
   }
@@ -426,6 +424,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
   }
 
   private void getCheckpoints() throws IOException {
+    var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
     this.checkpoints = new HashMap<>();
     Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
     Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -483,9 +482,11 @@ public class TopicStream extends CloudSolrStream implements Expressible {
 
   private void persistCheckpoints() throws IOException {
 
-    if (cloudSolrClient == null) {
+    if (clientCache == null) {
       return;
     }
+
+    var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
     UpdateRequest request = new UpdateRequest();
     request.setParam("collection", checkpointCollection);
     SolrInputDocument doc = new SolrInputDocument();
@@ -504,6 +505,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
   }
 
   private void getPersistedCheckpoints() throws IOException {
+    var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
     Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, cloudSolrClient, false);
 
     Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -537,47 +539,50 @@ public class TopicStream extends CloudSolrStream implements Expressible {
 
   @Override
   protected void constructStreams() throws IOException {
-    try {
-      Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
+    var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
+    Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
 
-      ModifiableSolrParams mParams = new ModifiableSolrParams(params);
-      mParams.set(DISTRIB, "false"); // We are the aggregator.
-      String fl = mParams.get("fl");
-      mParams.set(SORT, "_version_ asc");
-      if (!fl.contains(VERSION_FIELD)) {
-        fl += ",_version_";
-      }
-      mParams.set("fl", fl);
+    ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+    mParams.set(DISTRIB, "false"); // We are the aggregator.
+    String fl = mParams.get("fl");
+    mParams.set(SORT, "_version_ asc");
+    if (!fl.contains(VERSION_FIELD)) {
+      fl += ",_version_";
+    }
+    mParams.set("fl", fl);
 
-      Random random = new Random();
+    Random random = new Random();
 
-      Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
+    Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
 
-      for (Slice slice : slices) {
-        ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
-        long checkpoint = checkpoints.get(slice.getName());
+    for (Slice slice : slices) {
+      ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
+      long checkpoint = checkpoints.get(slice.getName());
 
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList<>();
-        for (Replica replica : replicas) {
-          if (replica.getState() == Replica.State.ACTIVE
-              && liveNodes.contains(replica.getNodeName())) shuffler.add(replica);
-        }
+      Collection<Replica> replicas = slice.getReplicas();
+      List<Replica> shuffler = new ArrayList<>();
+      for (Replica replica : replicas) {
+        if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+          shuffler.add(replica);
+      }
 
-        Replica rep = shuffler.get(random.nextInt(shuffler.size()));
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
-        SolrStream solrStream = new SolrStream(url, localParams);
-        solrStream.setSlice(slice.getName());
-        solrStream.setCheckpoint(checkpoint);
-        solrStream.setTrace(true);
-        if (streamContext != null) {
-          solrStream.setStreamContext(streamContext);
-        }
-        solrStreams.add(solrStream);
+      Replica rep = shuffler.get(random.nextInt(shuffler.size()));
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+      String url = zkProps.getCoreUrl();
+      SolrStream solrStream = new SolrStream(url, localParams);
+      solrStream.setSlice(slice.getName());
+      solrStream.setCheckpoint(checkpoint);
+      solrStream.setTrace(true);
+      if (streamContext != null) {
+        solrStream.setStreamContext(streamContext);
       }
-    } catch (Exception e) {
-      throw new IOException(e);
+      solrStreams.add(solrStream);
     }
   }
+
+  @Override
+  public void setStreamContext(StreamContext context) {
+    super.setStreamContext(context);
+    this.clientCache = context.getSolrClientCache();
+  }
 }
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
index 03188e24005..adf1640f671 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
@@ -21,10 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Optional;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -52,7 +49,7 @@ public class UpdateStream extends TupleStream implements Expressible {
   // field name in summary tuple for #docs updated in batch
   public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed";
   private String collection;
-  private String zkHost;
+  protected String zkHost;
   private int updateBatchSize;
   /**
    * Indicates if the {@link CommonParams#VERSION_FIELD} should be removed from tuples when
@@ -64,11 +61,12 @@ public class UpdateStream extends TupleStream implements Expressible {
   private int batchNumber;
   private long totalDocsIndex;
   private PushBackStream tupleSource;
-  private transient SolrClientCache cache;
-  private transient CloudSolrClient cloudSolrClient;
   private List<SolrInputDocument> documentBatch = new ArrayList<>();
   private String coreName;
 
+  protected transient SolrClientCache clientCache;
+  private transient boolean doCloseCache;
+
   public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException {
     String collectionName = factory.getValueOperand(expression, 0);
     verifyCollectionName(collectionName, expression);
@@ -122,7 +120,12 @@ public class UpdateStream extends TupleStream implements Expressible {
 
   @Override
   public void open() throws IOException {
-    setCloudSolrClient();
+    if (clientCache == null) {
+      doCloseCache = true;
+      clientCache = new SolrClientCache();
+    } else {
+      doCloseCache = false;
+    }
     tupleSource.open();
   }
 
@@ -153,8 +156,8 @@ public class UpdateStream extends TupleStream implements Expressible {
 
   @Override
   public void close() throws IOException {
-    if (cache == null && cloudSolrClient != null) {
-      cloudSolrClient.close();
+    if (doCloseCache) {
+      clientCache.close();
     }
     tupleSource.close();
   }
@@ -226,7 +229,7 @@ public class UpdateStream extends TupleStream implements Expressible {
 
   @Override
   public void setStreamContext(StreamContext context) {
-    this.cache = context.getSolrClientCache();
+    this.clientCache = context.getSolrClientCache();
     this.coreName = (String) context.get("core");
     this.tupleSource.setStreamContext(context);
   }
@@ -314,23 +317,6 @@ public class UpdateStream extends TupleStream implements Expressible {
     return true;
   }
 
-  /** Only viable after calling {@link #open} */
-  protected CloudSolrClient getCloudSolrClient() {
-    assert null != this.cloudSolrClient;
-    return this.cloudSolrClient;
-  }
-
-  private void setCloudSolrClient() {
-    if (this.cache != null) {
-      this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
-    } else {
-      final List<String> hosts = new ArrayList<>();
-      hosts.add(zkHost);
-      this.cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
-      this.cloudSolrClient.connect();
-    }
-  }
-
   private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
     SolrInputDocument doc = new SolrInputDocument();
     for (String field : tuple.getFields().keySet()) {
@@ -365,6 +351,7 @@ public class UpdateStream extends TupleStream implements Expressible {
     }
 
     try {
+      var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
       cloudSolrClient.add(collection, documentBatch);
     } catch (SolrServerException | IOException e) {
       // TODO: it would be nice if there was an option to "skipFailedBatches"
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index 6311acf1b29..260d9a059c4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -342,6 +342,14 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
       return this;
     }
 
+    /**
+     * Set the internal http client.
+     *
+     * <p>Note: closing the httpClient instance is at the responsibility of the caller.
+     *
+     * @param httpClient http client
+     * @return this
+     */
     public Builder withHttpClient(Http2SolrClient httpClient) {
       if (this.internalClientBuilder != null) {
         throw new IllegalStateException(
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 25513a74dd4..63fc96f2b77 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -170,11 +170,7 @@ public class Http2SolrClient extends SolrClient {
       this.serverBaseUrl = null;
     }
 
-    if (builder.idleTimeoutMillis != null && builder.idleTimeoutMillis > 0) {
-      this.idleTimeoutMillis = builder.idleTimeoutMillis;
-    } else {
-      this.idleTimeoutMillis = HttpClientUtil.DEFAULT_SO_TIMEOUT;
-    }
+    this.idleTimeoutMillis = builder.idleTimeoutMillis;
 
     if (builder.httpClient != null) {
       this.httpClient = builder.httpClient;
@@ -283,8 +279,7 @@ public class Http2SolrClient extends SolrClient {
     this.authenticationStore = new AuthenticationStoreHolder();
     httpClient.setAuthenticationStore(this.authenticationStore);
 
-    if (builder.connectionTimeoutMillis != null)
-      httpClient.setConnectTimeout(builder.connectionTimeoutMillis);
+    httpClient.setConnectTimeout(builder.connectionTimeoutMillis);
 
     setupProxy(builder, httpClient);
 
@@ -859,6 +854,7 @@ public class Http2SolrClient extends SolrClient {
         // no processor specified, return raw stream
         NamedList<Object> rsp = new NamedList<>();
         rsp.add("stream", is);
+        rsp.add("responseStatus", httpStatus);
         // Only case where stream should not be closed
         shouldClose = false;
         return rsp;
@@ -1060,6 +1056,13 @@ public class Http2SolrClient extends SolrClient {
     }
 
     public Http2SolrClient build() {
+      if (idleTimeoutMillis == null || idleTimeoutMillis <= 0) {
+        idleTimeoutMillis = (long) HttpClientUtil.DEFAULT_SO_TIMEOUT;
+      }
+      if (connectionTimeoutMillis == null) {
+        connectionTimeoutMillis = (long) HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
+      }
+
       Http2SolrClient client = new Http2SolrClient(baseSolrUrl, this);
       try {
         httpClientBuilderSetup(client);
@@ -1222,6 +1225,10 @@ public class Http2SolrClient extends SolrClient {
       return this;
     }
 
+    public Long getIdleTimeoutMillis() {
+      return idleTimeoutMillis;
+    }
+
     public Builder useHttp1_1(boolean useHttp1_1) {
       this.useHttp1_1 = useHttp1_1;
       return this;
@@ -1241,6 +1248,10 @@ public class Http2SolrClient extends SolrClient {
       return this;
     }
 
+    public Long getConnectionTimeout() {
+      return connectionTimeoutMillis;
+    }
+
     /**
      * Set a timeout in milliseconds for requests issued by this client.
      *
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 5f7b8a17ee8..c03fbc9f3f9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -643,6 +643,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
         NamedList<Object> rsp = new NamedList<>();
         rsp.add("stream", respBody);
         rsp.add("closeableResponse", response);
+        rsp.add("responseStatus", response.getStatusLine().getStatusCode());
         // Only case where stream should not be closed
         shouldClose = false;
         return rsp;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
index 645a19040de..d0c97f8ed22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
@@ -127,6 +127,10 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
     return getThis();
   }
 
+  public int getConnectionTimeoutMillis() {
+    return this.connectionTimeoutMillis;
+  }
+
   /**
    * Tells {@link Builder} that created clients should set the following read timeout on all
    * sockets.
@@ -155,4 +159,8 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
     socketTimeoutMillisUpdate = true;
     return getThis();
   }
+
+  public int getSocketTimeoutMillis() {
+    return this.socketTimeoutMillis;
+  }
 }