You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by st...@apache.org on 2023/09/27 20:47:56 UTC

[solr] branch branch_9x updated: SOLR-16992 Non-reproducible StreamingTest failures (#1955)

This is an automated email from the ASF dual-hosted git repository.

stillalex pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 12b7579573f SOLR-16992 Non-reproducible StreamingTest failures (#1955)
12b7579573f is described below

commit 12b7579573fb7598be3deca8498249c5f00eb821
Author: Alex D <st...@apache.org>
AuthorDate: Wed Sep 27 13:44:14 2023 -0700

    SOLR-16992 Non-reproducible StreamingTest failures (#1955)
    
    -- suggests CloudSolrStream concurency race condition
    
    (cherry picked from commit f0fcd300c896b858ae83235ecdb0a109eaea5cea)
---
 solr/CHANGES.txt                                   |  2 +
 .../solr/client/solrj/io/SolrClientCache.java      | 23 +++--
 .../client/solrj/io/stream/CloudSolrStream.java    | 30 ++-----
 .../client/solrj/io/stream/DeepRandomStream.java   | 37 +++-----
 .../solrj/io/stream/FeaturesSelectionStream.java   | 29 ++-----
 .../client/solrj/io/stream/ParallelListStream.java | 35 +++-----
 .../solrj/io/stream/SignificantTermsStream.java    | 29 ++-----
 .../solrj/io/stream/StreamExecutorHelper.java      | 53 ++++++++++++
 .../client/solrj/io/stream/TextLogitStream.java    | 29 ++-----
 .../solr/client/solrj/io/stream/TopicStream.java   | 40 +++------
 .../solr/client/solrj/io/stream/TupleStream.java   | 98 +++++++++++-----------
 .../solrj/io/stream/StreamExecutorHelperTest.java  | 68 +++++++++++++++
 .../org/apache/solr/common/util/ExecutorUtil.java  | 49 +++++++++++
 .../apache/solr/common/util/ExecutorUtilTest.java  | 58 +++++++++++++
 14 files changed, 357 insertions(+), 223 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d5ad8ca890a..573290f2ce7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -161,6 +161,8 @@ Bug Fixes
 
 * SOLR-16980: Connect to SOLR standalone with basic authentication (Alex Deparvu)
 
+* SOLR-16992: Non-reproducible StreamingTest failures -- suggests CloudSolrStream concurency race condition (Alex Deparvu, hossman)
+
 Dependency Upgrades
 ---------------------
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 02621784d00..e56d1a55c13 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -17,7 +17,6 @@
 package org.apache.solr.client.solrj.io;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
@@ -34,6 +34,8 @@ import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.SolrClientBuilder;
+import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ public class SolrClientCache implements Closeable {
   private final Map<String, SolrClient> solrClients = new HashMap<>();
   private final HttpClient apacheHttpClient;
   private final Http2SolrClient http2SolrClient;
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
   public SolrClientCache() {
     this.apacheHttpClient = null;
@@ -72,6 +75,7 @@ public class SolrClientCache implements Closeable {
   }
 
   public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
+    ensureOpen();
     Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
     if (solrClients.containsKey(zkHost)) {
       return (CloudSolrClient) solrClients.get(zkHost);
@@ -108,6 +112,7 @@ public class SolrClientCache implements Closeable {
   }
 
   public synchronized SolrClient getHttpSolrClient(String baseUrl) {
+    ensureOpen();
     Objects.requireNonNull(baseUrl, "Url cannot be null!");
     if (solrClients.containsKey(baseUrl)) {
       return solrClients.get(baseUrl);
@@ -159,13 +164,17 @@ public class SolrClientCache implements Closeable {
 
   @Override
   public synchronized void close() {
-    for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
-      try {
-        entry.getValue().close();
-      } catch (IOException e) {
-        log.error("Error closing SolrClient for {}", entry.getKey(), e);
+    if (isClosed.compareAndSet(false, true)) {
+      for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
+        IOUtils.closeQuietly(entry.getValue());
       }
+      solrClients.clear();
+    }
+  }
+
+  private void ensureOpen() {
+    if (isClosed.get()) {
+      throw new AlreadyClosedException();
     }
-    solrClients.clear();
   }
 }
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 8b2bd54aef7..f75dec91c38 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.SORT;
 
@@ -32,8 +33,6 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -54,8 +53,6 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * Connects to Zookeeper to pick replicas from a specific collection to send the query to. Under the
@@ -412,24 +409,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   }
 
   private void openStreams() throws IOException {
-    ExecutorService service =
-        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("CloudSolrStream"));
-    List<Future<TupleWrapper>> futures =
+    List<StreamOpener> tasks =
         solrStreams.stream()
-            .map(ss -> service.submit(new StreamOpener((SolrStream) ss, comp)))
-            .collect(Collectors.toList());
-    try {
-      for (Future<TupleWrapper> f : futures) {
-        TupleWrapper w = f.get();
-        if (w != null) {
-          tuples.add(w);
-        }
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    } finally {
-      service.shutdown();
-    }
+            .map(s -> new StreamOpener((SolrStream) s, comp))
+            .collect(Collectors.toUnmodifiableList());
+    var results =
+        submitAllAndAwaitAggregatingExceptions(tasks, "CloudSolrStream").stream()
+            .filter(Objects::nonNull)
+            .collect(Collectors.toUnmodifiableList());
+    tuples.addAll(results);
   }
 
   /** Closes the CloudSolrStream */
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index 2e32dc26f09..8598cdb1604 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.ROWS;
 import static org.apache.solr.common.params.CommonParams.SORT;
@@ -34,8 +35,6 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -49,8 +48,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * Connects to Zookeeper to pick replicas from a specific collection to send the query to. Under the
@@ -322,29 +319,15 @@ public class DeepRandomStream extends TupleStream implements Expressible {
   }
 
   private void openStreams() throws IOException {
-    ExecutorService service =
-        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("DeepRandomStream"));
-    try {
-      List<Future<TupleWrapper>> futures = new ArrayList<>();
-      for (TupleStream solrStream : solrStreams) {
-        StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
-        Future<TupleWrapper> future = service.submit(so);
-        futures.add(future);
-      }
-
-      try {
-        for (Future<TupleWrapper> f : futures) {
-          TupleWrapper w = f.get();
-          if (w != null) {
-            tuples.add(w);
-          }
-        }
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    } finally {
-      service.shutdown();
-    }
+    List<Callable<TupleWrapper>> tasks =
+        solrStreams.stream()
+            .map(s -> new StreamOpener((SolrStream) s, comp))
+            .collect(Collectors.toUnmodifiableList());
+    var results =
+        submitAllAndAwaitAggregatingExceptions(tasks, "DeepRandomStream").stream()
+            .filter(Objects::nonNull)
+            .collect(Collectors.toUnmodifiableList());
+    tuples.addAll(results);
   }
 
   @Override
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 3a761ceaee4..9c7478a4623 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.ID;
 
@@ -33,8 +34,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.stream.Stream;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -54,9 +53,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * @since 6.2.0
@@ -77,7 +74,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
 
   protected transient SolrClientCache clientCache;
   private transient boolean doCloseCache;
-  protected transient ExecutorService executorService;
 
   public FeaturesSelectionStream(
       String zkHost,
@@ -262,10 +258,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
     } else {
       doCloseCache = false;
     }
-
-    this.executorService =
-        ExecutorUtil.newMDCAwareCachedThreadPool(
-            new SolrNamedThreadFactory("FeaturesSelectionStream"));
   }
 
   @Override
@@ -304,9 +296,8 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
     }
   }
 
-  private List<Future<NamedList<?>>> callShards(List<String> baseUrls) throws IOException {
-
-    List<Future<NamedList<?>>> futures = new ArrayList<>();
+  private Collection<NamedList<?>> callShards(List<String> baseUrls) throws IOException {
+    List<FeaturesSelectionCall> tasks = new ArrayList<>();
     for (String baseUrl : baseUrls) {
       FeaturesSelectionCall lc =
           new FeaturesSelectionCall(
@@ -317,12 +308,9 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
               this.positiveLabel,
               this.numTerms,
               this.clientCache);
-
-      Future<NamedList<?>> future = executorService.submit(lc);
-      futures.add(future);
+      tasks.add(lc);
     }
-
-    return futures;
+    return submitAllAndAwaitAggregatingExceptions(tasks, "FeaturesSelectionStream");
   }
 
   @Override
@@ -330,10 +318,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
     if (doCloseCache) {
       clientCache.close();
     }
-
-    if (executorService != null) {
-      executorService.shutdown();
-    }
   }
 
   /** Return the stream sort - ie, the order in which records are returned */
@@ -359,8 +343,7 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
         Map<String, Long> docFreqs = new HashMap<>();
 
         long numDocs = 0;
-        for (Future<NamedList<?>> getTopTermsCall : callShards(getShardUrls())) {
-          NamedList<?> resp = getTopTermsCall.get();
+        for (NamedList<?> resp : callShards(getShardUrls())) {
 
           @SuppressWarnings({"unchecked"})
           NamedList<Double> shardTopTerms = (NamedList<Double>) resp.get("featuredTerms");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
index a8a205c06ce..8c86384dbbf 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
@@ -17,12 +17,12 @@
 
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -31,8 +31,6 @@ import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 public class ParallelListStream extends TupleStream implements Expressible {
 
@@ -140,27 +138,14 @@ public class ParallelListStream extends TupleStream implements Expressible {
   }
 
   private void openStreams() throws IOException {
-    ExecutorService service =
-        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("ParallelListStream"));
-    try {
-      List<Future<StreamIndex>> futures = new ArrayList<>();
-      int i = 0;
-      for (TupleStream tupleStream : streams) {
-        StreamOpener so = new StreamOpener(new StreamIndex(tupleStream, i++));
-        Future<StreamIndex> future = service.submit(so);
-        futures.add(future);
-      }
-
-      try {
-        for (Future<StreamIndex> f : futures) {
-          StreamIndex streamIndex = f.get();
-          this.streams[streamIndex.getIndex()] = streamIndex.getTupleStream();
-        }
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    } finally {
-      service.shutdown();
+    List<Callable<StreamIndex>> tasks = new ArrayList<>();
+    int i = 0;
+    for (TupleStream tupleStream : streams) {
+      tasks.add(new StreamOpener(new StreamIndex(tupleStream, i++)));
+    }
+    var results = submitAllAndAwaitAggregatingExceptions(tasks, "ParallelListStream");
+    for (var r : results) {
+      this.streams[r.getIndex()] = r.getTupleStream();
     }
   }
 
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index 22d7c8bc73c..3b79a3c1fa4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -17,10 +17,12 @@
 
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -47,9 +47,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * @since 6.5.0
@@ -71,7 +69,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
   private transient SolrClientCache clientCache;
   private transient boolean doCloseCache;
   private transient StreamContext streamContext;
-  private transient ExecutorService executorService;
 
   public SignificantTermsStream(
       String zkHost,
@@ -251,10 +248,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
     } else {
       doCloseCache = false;
     }
-
-    this.executorService =
-        ExecutorUtil.newMDCAwareCachedThreadPool(
-            new SolrNamedThreadFactory("SignificantTermsStream"));
   }
 
   @Override
@@ -262,9 +255,8 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
     return null;
   }
 
-  private List<Future<NamedList<?>>> callShards(List<String> baseUrls) throws IOException {
-
-    List<Future<NamedList<?>>> futures = new ArrayList<>();
+  private Collection<NamedList<?>> callShards(List<String> baseUrls) throws IOException {
+    List<SignificantTermsCall> tasks = new ArrayList<>();
     for (String baseUrl : baseUrls) {
       SignificantTermsCall lc =
           new SignificantTermsCall(
@@ -277,12 +269,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
               this.numTerms,
               streamContext.isLocal(),
               clientCache);
-
-      Future<NamedList<?>> future = executorService.submit(lc);
-      futures.add(future);
+      tasks.add(lc);
     }
-
-    return futures;
+    return submitAllAndAwaitAggregatingExceptions(tasks, "SignificantTermsStream");
   }
 
   @Override
@@ -290,8 +279,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
     if (doCloseCache) {
       clientCache.close();
     }
-
-    executorService.shutdown();
   }
 
   /** Return the stream sort - ie, the order in which records are returned */
@@ -316,9 +303,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
         Map<String, int[]> mergeFreqs = new HashMap<>();
         long numDocs = 0;
         long resultCount = 0;
-        for (Future<NamedList<?>> getTopTermsCall :
-            callShards(getShards(zkHost, collection, streamContext))) {
-          NamedList<?> fullResp = getTopTermsCall.get();
+        for (NamedList<?> fullResp : callShards(getShards(zkHost, collection, streamContext))) {
           Map<?, ?> stResp = (Map<?, ?>) fullResp.get("significantTerms");
 
           @SuppressWarnings({"unchecked"})
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java
new file mode 100644
index 00000000000..0df2add101f
--- /dev/null
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+
+public class StreamExecutorHelper {
+
+  /**
+   * Takes a list of Callables and executes them returning the results as a list. The method waits
+   * for the return of every task even if one of them throws an exception. If any exception happens
+   * it will be thrown, wrapped into an IOException, and other following exceptions will be added as
+   * `addSuppressed` to the original exception
+   *
+   * @param <T> the response type
+   * @param tasks the list of callables to be executed
+   * @param threadsName name to be used by the SolrNamedThreadFactory
+   * @return results collection
+   * @throws IOException in case any exceptions happened
+   */
+  public static <T> Collection<T> submitAllAndAwaitAggregatingExceptions(
+      List<? extends Callable<T>> tasks, String threadsName) throws IOException {
+    ExecutorService service =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory(threadsName));
+    try {
+      return ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks).stream()
+          .collect(Collectors.toList());
+    } finally {
+      ExecutorUtil.shutdownNowAndAwaitTermination(service);
+    }
+  }
+}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index e3ff627232a..1461a3af70d 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.ID;
 
@@ -33,8 +34,6 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.io.ClassificationEvaluation;
@@ -55,9 +54,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * @since 6.2.0
@@ -85,7 +82,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
   private transient boolean doCloseCache;
 
   protected transient StreamContext streamContext;
-  protected transient ExecutorService executorService;
   protected TupleStream termsStream;
   private List<String> terms;
 
@@ -367,9 +363,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
     } else {
       doCloseCache = false;
     }
-
-    this.executorService =
-        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TextLogitSolrStream"));
   }
 
   @Override
@@ -410,9 +403,8 @@ public class TextLogitStream extends TupleStream implements Expressible {
     }
   }
 
-  private List<Future<Tuple>> callShards(List<String> baseUrls) throws IOException {
-
-    List<Future<Tuple>> futures = new ArrayList<>();
+  private Collection<Tuple> callShards(List<String> baseUrls) throws IOException {
+    List<LogitCall> tasks = new ArrayList<>();
     for (String baseUrl : baseUrls) {
       LogitCall lc =
           new LogitCall(
@@ -428,12 +420,9 @@ public class TextLogitStream extends TupleStream implements Expressible {
               this.threshold,
               this.idfs,
               this.clientCache);
-
-      Future<Tuple> future = executorService.submit(lc);
-      futures.add(future);
+      tasks.add(lc);
     }
-
-    return futures;
+    return submitAllAndAwaitAggregatingExceptions(tasks, "TextLogitSolrStream");
   }
 
   @Override
@@ -441,10 +430,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
     if (doCloseCache) {
       clientCache.close();
     }
-
-    if (executorService != null) {
-      executorService.shutdown();
-    }
     termsStream.close();
   }
 
@@ -511,9 +496,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
         this.evaluation = new ClassificationEvaluation();
 
         this.error = 0;
-        for (Future<Tuple> logitCall : callShards(getShardUrls())) {
-
-          Tuple tuple = logitCall.get();
+        for (Tuple tuple : callShards(getShardUrls())) {
           @SuppressWarnings({"unchecked"})
           List<Double> shardWeights = (List<Double>) tuple.get("weights");
           allWeights.add(shardWeights);
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index e343d56e35c..8e3844c152a 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.client.solrj.io.stream;
 
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
 import static org.apache.solr.common.params.CommonParams.DISTRIB;
 import static org.apache.solr.common.params.CommonParams.ID;
 import static org.apache.solr.common.params.CommonParams.SORT;
@@ -32,11 +33,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -59,8 +60,6 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
 
 /**
  * @since 6.0.0
@@ -338,30 +337,15 @@ public class TopicStream extends CloudSolrStream implements Expressible {
   }
 
   private void openStreams() throws IOException {
-
-    ExecutorService service =
-        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TopicStream"));
-    try {
-      List<Future<TupleWrapper>> futures = new ArrayList<>();
-      for (TupleStream solrStream : solrStreams) {
-        StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
-        Future<TupleWrapper> future = service.submit(so);
-        futures.add(future);
-      }
-
-      try {
-        for (Future<TupleWrapper> f : futures) {
-          TupleWrapper w = f.get();
-          if (w != null) {
-            tuples.add(w);
-          }
-        }
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    } finally {
-      service.shutdown();
-    }
+    List<Callable<TupleWrapper>> tasks =
+        solrStreams.stream()
+            .map(s -> new StreamOpener((SolrStream) s, comp))
+            .collect(Collectors.toUnmodifiableList());
+    var results =
+        submitAllAndAwaitAggregatingExceptions(tasks, "TopicStream").stream()
+            .filter(Objects::nonNull)
+            .collect(Collectors.toUnmodifiableList());
+    tuples.addAll(results);
   }
 
   @Override
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 13faf73acde..435d5984083 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -42,6 +42,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.IOUtils;
 
 /**
  * @since 5.1.0
@@ -125,6 +126,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
   static List<Replica> getReplicas(
       String zkHost, String collection, StreamContext streamContext, SolrParams requestParams)
       throws IOException {
+    if (zkHost == null) {
+      throw new IOException(
+          String.format(
+              Locale.ROOT,
+              "invalid expression - zkHost not found for collection '%s'",
+              collection));
+    }
+
     List<Replica> replicas = new ArrayList<>();
 
     // SolrCloud Sharding
@@ -139,54 +148,49 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
       localSolrClientCache = null;
     }
 
-    if (zkHost == null) {
-      throw new IOException(
-          String.format(
-              Locale.ROOT,
-              "invalid expression - zkHost not found for collection '%s'",
-              collection));
-    }
-
-    CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
-    ClusterState clusterState = cloudSolrClient.getClusterStateProvider().getClusterState();
-    Slice[] slices = CloudSolrStream.getSlices(collection, cloudSolrClient, true);
-    Set<String> liveNodes = clusterState.getLiveNodes();
-
-    RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
-    final ModifiableSolrParams solrParams;
-    if (streamContext != null) {
-      solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
-      requestReplicaListTransformerGenerator =
-          streamContext.getRequestReplicaListTransformerGenerator();
-    } else {
-      solrParams = new ModifiableSolrParams();
-      requestReplicaListTransformerGenerator = null;
-    }
-    if (requestReplicaListTransformerGenerator == null) {
-      requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
-    }
-    solrParams.add(requestParams);
-
-    ReplicaListTransformer replicaListTransformer =
-        requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
-
-    final String coreFilter =
-        streamContext != null && streamContext.isLocal()
-            ? (String) streamContext.get("core")
-            : null;
-    List<Replica> sortedReplicas = new ArrayList<>();
-    for (Slice slice : slices) {
-      slice.getReplicas().stream().filter(r -> r.isActive(liveNodes)).forEach(sortedReplicas::add);
-      replicaListTransformer.transform(sortedReplicas);
-      sortedReplicas.stream()
-          .filter(r -> coreFilter == null || coreFilter.equals(r.core))
-          .findFirst()
-          .ifPresent(replicas::add);
-      sortedReplicas.clear();
-    }
+    try {
+      CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
+      ClusterState clusterState = cloudSolrClient.getClusterStateProvider().getClusterState();
+      Slice[] slices = CloudSolrStream.getSlices(collection, cloudSolrClient, true);
+      Set<String> liveNodes = clusterState.getLiveNodes();
+
+      RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+      final ModifiableSolrParams solrParams;
+      if (streamContext != null) {
+        solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+        requestReplicaListTransformerGenerator =
+            streamContext.getRequestReplicaListTransformerGenerator();
+      } else {
+        solrParams = new ModifiableSolrParams();
+        requestReplicaListTransformerGenerator = null;
+      }
+      if (requestReplicaListTransformerGenerator == null) {
+        requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+      }
+      solrParams.add(requestParams);
+
+      ReplicaListTransformer replicaListTransformer =
+          requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
+      final String coreFilter =
+          streamContext != null && streamContext.isLocal()
+              ? (String) streamContext.get("core")
+              : null;
+      List<Replica> sortedReplicas = new ArrayList<>();
+      for (Slice slice : slices) {
+        slice.getReplicas().stream()
+            .filter(r -> r.isActive(liveNodes))
+            .forEach(sortedReplicas::add);
+        replicaListTransformer.transform(sortedReplicas);
+        sortedReplicas.stream()
+            .filter(r -> coreFilter == null || coreFilter.equals(r.core))
+            .findFirst()
+            .ifPresent(replicas::add);
+        sortedReplicas.clear();
+      }
 
-    if (localSolrClientCache != null) {
-      localSolrClientCache.close();
+    } finally {
+      IOUtils.closeQuietly(localSolrClientCache);
     }
 
     return replicas;
diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java
new file mode 100644
index 00000000000..ae80cea1060
--- /dev/null
+++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.solr.SolrTestCase;
+import org.junit.Test;
+
+public class StreamExecutorHelperTest extends SolrTestCase {
+
+  @Test
+  public void submitAllTest() throws IOException {
+    AtomicLong idx = new AtomicLong();
+    Callable<Long> c = () -> idx.getAndIncrement();
+
+    List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+    List<Long> results = new ArrayList<>();
+    results.addAll(StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions(tasks, "test"));
+    Collections.sort(results);
+    List<Long> expected = List.of(0l, 1l, 2l, 3l, 4l);
+    assertEquals(expected, results);
+  }
+
+  @Test
+  public void submitAllWithExceptionsTest() {
+    AtomicLong idx = new AtomicLong();
+    Callable<Long> c =
+        () -> {
+          long id = idx.getAndIncrement();
+          if (id % 2 == 0) {
+            throw new Exception("TestException" + id);
+          }
+          return id;
+        };
+    List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+    IOException ex =
+        expectThrows(
+            IOException.class,
+            () -> StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions(tasks, "test"));
+    List<String> results = new ArrayList<>();
+    results.add(ex.getCause().getMessage());
+    for (var s : ex.getSuppressed()) {
+      results.add(s.getMessage());
+    }
+    Collections.sort(results);
+    List<String> expected = List.of("TestException0", "TestException2", "TestException4");
+    assertEquals(expected, results);
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index 1c8f5647e75..cfa7b176d50 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -16,13 +16,17 @@
  */
 package org.apache.solr.common.util;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
@@ -30,6 +34,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -329,4 +334,48 @@ public class ExecutorUtil {
     if (flag == null) isServerPool.remove();
     else isServerPool.set(flag);
   }
+
+  /**
+   * Takes an executor and a list of Callables and executes them returning the results as a list.
+   * The method waits for the return of every task even if one of them throws an exception. If any
+   * exception happens it will be thrown, wrapped into an IOException, and other following
+   * exceptions will be added as `addSuppressed` to the original exception
+   *
+   * @param <T> the response type
+   * @param service executor
+   * @param tasks the list of callables to be executed
+   * @return results list
+   * @throws IOException in case any exceptions happened
+   */
+  public static <T> Collection<T> submitAllAndAwaitAggregatingExceptions(
+      ExecutorService service, List<? extends Callable<T>> tasks) throws IOException {
+    List<T> results = new ArrayList<>(tasks.size());
+    IOException parentException = null;
+
+    // Could alternatively use service.invokeAll, but this way we can start looping over futures
+    // before all are done
+    List<Future<T>> futures =
+        tasks.stream().map(service::submit).collect(Collectors.toUnmodifiableList());
+    for (Future<T> f : futures) {
+      try {
+        results.add(f.get());
+      } catch (ExecutionException e) {
+        if (parentException == null) {
+          parentException = new IOException(e.getCause());
+        } else {
+          parentException.addSuppressed(e.getCause());
+        }
+      } catch (Exception e) {
+        if (parentException == null) {
+          parentException = new IOException(e);
+        } else {
+          parentException.addSuppressed(e);
+        }
+      }
+    }
+    if (parentException != null) {
+      throw parentException;
+    }
+    return results;
+  }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
index d7cfbe9c321..c3fc2137018 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -17,10 +17,16 @@
 package org.apache.solr.common.util;
 
 import com.carrotsearch.randomizedtesting.annotations.Timeout;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.solr.SolrTestCase;
 import org.apache.solr.util.TimeOut;
 import org.junit.Test;
@@ -95,4 +101,56 @@ public class ExecutorUtilTest extends SolrTestCase {
     }
     return true; // ran full time
   }
+
+  @Test
+  public void submitAllTest() throws IOException {
+    AtomicLong idx = new AtomicLong();
+    Callable<Long> c = () -> idx.getAndIncrement();
+
+    List<Long> results = new ArrayList<>();
+    ExecutorService service =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("test"));
+    try {
+      List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+      results.addAll(ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks));
+    } finally {
+      ExecutorUtil.shutdownNowAndAwaitTermination(service);
+    }
+    Collections.sort(results);
+    List<Long> expected = List.of(0l, 1l, 2l, 3l, 4l);
+    assertEquals(expected, results);
+  }
+
+  @Test
+  public void submitAllWithExceptionsTest() {
+    AtomicLong idx = new AtomicLong();
+    Callable<Long> c =
+        () -> {
+          long id = idx.getAndIncrement();
+          if (id % 2 == 0) {
+            throw new Exception("TestException" + id);
+          }
+          return id;
+        };
+
+    ExecutorService service =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("test"));
+    try {
+      List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+      IOException ex =
+          expectThrows(
+              IOException.class,
+              () -> ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks));
+      List<String> results = new ArrayList<>();
+      results.add(ex.getCause().getMessage());
+      for (var s : ex.getSuppressed()) {
+        results.add(s.getMessage());
+      }
+      Collections.sort(results);
+      List<String> expected = List.of("TestException0", "TestException2", "TestException4");
+      assertEquals(expected, results);
+    } finally {
+      ExecutorUtil.shutdownNowAndAwaitTermination(service);
+    }
+  }
 }