You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by st...@apache.org on 2023/09/27 20:47:56 UTC
[solr] branch branch_9x updated: SOLR-16992 Non-reproducible StreamingTest failures (#1955)
This is an automated email from the ASF dual-hosted git repository.
stillalex pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 12b7579573f SOLR-16992 Non-reproducible StreamingTest failures (#1955)
12b7579573f is described below
commit 12b7579573fb7598be3deca8498249c5f00eb821
Author: Alex D <st...@apache.org>
AuthorDate: Wed Sep 27 13:44:14 2023 -0700
SOLR-16992 Non-reproducible StreamingTest failures (#1955)
-- suggests CloudSolrStream concurency race condition
(cherry picked from commit f0fcd300c896b858ae83235ecdb0a109eaea5cea)
---
solr/CHANGES.txt | 2 +
.../solr/client/solrj/io/SolrClientCache.java | 23 +++--
.../client/solrj/io/stream/CloudSolrStream.java | 30 ++-----
.../client/solrj/io/stream/DeepRandomStream.java | 37 +++-----
.../solrj/io/stream/FeaturesSelectionStream.java | 29 ++-----
.../client/solrj/io/stream/ParallelListStream.java | 35 +++-----
.../solrj/io/stream/SignificantTermsStream.java | 29 ++-----
.../solrj/io/stream/StreamExecutorHelper.java | 53 ++++++++++++
.../client/solrj/io/stream/TextLogitStream.java | 29 ++-----
.../solr/client/solrj/io/stream/TopicStream.java | 40 +++------
.../solr/client/solrj/io/stream/TupleStream.java | 98 +++++++++++-----------
.../solrj/io/stream/StreamExecutorHelperTest.java | 68 +++++++++++++++
.../org/apache/solr/common/util/ExecutorUtil.java | 49 +++++++++++
.../apache/solr/common/util/ExecutorUtilTest.java | 58 +++++++++++++
14 files changed, 357 insertions(+), 223 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d5ad8ca890a..573290f2ce7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -161,6 +161,8 @@ Bug Fixes
* SOLR-16980: Connect to SOLR standalone with basic authentication (Alex Deparvu)
+* SOLR-16992: Non-reproducible StreamingTest failures -- suggests CloudSolrStream concurency race condition (Alex Deparvu, hossman)
+
Dependency Upgrades
---------------------
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 02621784d00..e56d1a55c13 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -17,7 +17,6 @@
package org.apache.solr.client.solrj.io;
import java.io.Closeable;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
@@ -25,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
@@ -34,6 +34,8 @@ import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientBuilder;
+import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,7 @@ public class SolrClientCache implements Closeable {
private final Map<String, SolrClient> solrClients = new HashMap<>();
private final HttpClient apacheHttpClient;
private final Http2SolrClient http2SolrClient;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
public SolrClientCache() {
this.apacheHttpClient = null;
@@ -72,6 +75,7 @@ public class SolrClientCache implements Closeable {
}
public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
+ ensureOpen();
Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
if (solrClients.containsKey(zkHost)) {
return (CloudSolrClient) solrClients.get(zkHost);
@@ -108,6 +112,7 @@ public class SolrClientCache implements Closeable {
}
public synchronized SolrClient getHttpSolrClient(String baseUrl) {
+ ensureOpen();
Objects.requireNonNull(baseUrl, "Url cannot be null!");
if (solrClients.containsKey(baseUrl)) {
return solrClients.get(baseUrl);
@@ -159,13 +164,17 @@ public class SolrClientCache implements Closeable {
@Override
public synchronized void close() {
- for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
- try {
- entry.getValue().close();
- } catch (IOException e) {
- log.error("Error closing SolrClient for {}", entry.getKey(), e);
+ if (isClosed.compareAndSet(false, true)) {
+ for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
+ IOUtils.closeQuietly(entry.getValue());
}
+ solrClients.clear();
+ }
+ }
+
+ private void ensureOpen() {
+ if (isClosed.get()) {
+ throw new AlreadyClosedException();
}
- solrClients.clear();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 8b2bd54aef7..f75dec91c38 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.SORT;
@@ -32,8 +33,6 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -54,8 +53,6 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* Connects to Zookeeper to pick replicas from a specific collection to send the query to. Under the
@@ -412,24 +409,15 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
private void openStreams() throws IOException {
- ExecutorService service =
- ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("CloudSolrStream"));
- List<Future<TupleWrapper>> futures =
+ List<StreamOpener> tasks =
solrStreams.stream()
- .map(ss -> service.submit(new StreamOpener((SolrStream) ss, comp)))
- .collect(Collectors.toList());
- try {
- for (Future<TupleWrapper> f : futures) {
- TupleWrapper w = f.get();
- if (w != null) {
- tuples.add(w);
- }
- }
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- service.shutdown();
- }
+ .map(s -> new StreamOpener((SolrStream) s, comp))
+ .collect(Collectors.toUnmodifiableList());
+ var results =
+ submitAllAndAwaitAggregatingExceptions(tasks, "CloudSolrStream").stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toUnmodifiableList());
+ tuples.addAll(results);
}
/** Closes the CloudSolrStream */
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index 2e32dc26f09..8598cdb1604 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.ROWS;
import static org.apache.solr.common.params.CommonParams.SORT;
@@ -34,8 +35,6 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -49,8 +48,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* Connects to Zookeeper to pick replicas from a specific collection to send the query to. Under the
@@ -322,29 +319,15 @@ public class DeepRandomStream extends TupleStream implements Expressible {
}
private void openStreams() throws IOException {
- ExecutorService service =
- ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("DeepRandomStream"));
- try {
- List<Future<TupleWrapper>> futures = new ArrayList<>();
- for (TupleStream solrStream : solrStreams) {
- StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
- Future<TupleWrapper> future = service.submit(so);
- futures.add(future);
- }
-
- try {
- for (Future<TupleWrapper> f : futures) {
- TupleWrapper w = f.get();
- if (w != null) {
- tuples.add(w);
- }
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- } finally {
- service.shutdown();
- }
+ List<Callable<TupleWrapper>> tasks =
+ solrStreams.stream()
+ .map(s -> new StreamOpener((SolrStream) s, comp))
+ .collect(Collectors.toUnmodifiableList());
+ var results =
+ submitAllAndAwaitAggregatingExceptions(tasks, "DeepRandomStream").stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toUnmodifiableList());
+ tuples.addAll(results);
}
@Override
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index 3a761ceaee4..9c7478a4623 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.ID;
@@ -33,8 +34,6 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -54,9 +53,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* @since 6.2.0
@@ -77,7 +74,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
protected transient SolrClientCache clientCache;
private transient boolean doCloseCache;
- protected transient ExecutorService executorService;
public FeaturesSelectionStream(
String zkHost,
@@ -262,10 +258,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
} else {
doCloseCache = false;
}
-
- this.executorService =
- ExecutorUtil.newMDCAwareCachedThreadPool(
- new SolrNamedThreadFactory("FeaturesSelectionStream"));
}
@Override
@@ -304,9 +296,8 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
}
}
- private List<Future<NamedList<?>>> callShards(List<String> baseUrls) throws IOException {
-
- List<Future<NamedList<?>>> futures = new ArrayList<>();
+ private Collection<NamedList<?>> callShards(List<String> baseUrls) throws IOException {
+ List<FeaturesSelectionCall> tasks = new ArrayList<>();
for (String baseUrl : baseUrls) {
FeaturesSelectionCall lc =
new FeaturesSelectionCall(
@@ -317,12 +308,9 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
this.positiveLabel,
this.numTerms,
this.clientCache);
-
- Future<NamedList<?>> future = executorService.submit(lc);
- futures.add(future);
+ tasks.add(lc);
}
-
- return futures;
+ return submitAllAndAwaitAggregatingExceptions(tasks, "FeaturesSelectionStream");
}
@Override
@@ -330,10 +318,6 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
if (doCloseCache) {
clientCache.close();
}
-
- if (executorService != null) {
- executorService.shutdown();
- }
}
/** Return the stream sort - ie, the order in which records are returned */
@@ -359,8 +343,7 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
Map<String, Long> docFreqs = new HashMap<>();
long numDocs = 0;
- for (Future<NamedList<?>> getTopTermsCall : callShards(getShardUrls())) {
- NamedList<?> resp = getTopTermsCall.get();
+ for (NamedList<?> resp : callShards(getShardUrls())) {
@SuppressWarnings({"unchecked"})
NamedList<Double> shardTopTerms = (NamedList<Double>) resp.get("featuredTerms");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
index a8a205c06ce..8c86384dbbf 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java
@@ -17,12 +17,12 @@
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -31,8 +31,6 @@ import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
public class ParallelListStream extends TupleStream implements Expressible {
@@ -140,27 +138,14 @@ public class ParallelListStream extends TupleStream implements Expressible {
}
private void openStreams() throws IOException {
- ExecutorService service =
- ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("ParallelListStream"));
- try {
- List<Future<StreamIndex>> futures = new ArrayList<>();
- int i = 0;
- for (TupleStream tupleStream : streams) {
- StreamOpener so = new StreamOpener(new StreamIndex(tupleStream, i++));
- Future<StreamIndex> future = service.submit(so);
- futures.add(future);
- }
-
- try {
- for (Future<StreamIndex> f : futures) {
- StreamIndex streamIndex = f.get();
- this.streams[streamIndex.getIndex()] = streamIndex.getTupleStream();
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- } finally {
- service.shutdown();
+ List<Callable<StreamIndex>> tasks = new ArrayList<>();
+ int i = 0;
+ for (TupleStream tupleStream : streams) {
+ tasks.add(new StreamOpener(new StreamIndex(tupleStream, i++)));
+ }
+ var results = submitAllAndAwaitAggregatingExceptions(tasks, "ParallelListStream");
+ for (var r : results) {
+ this.streams[r.getIndex()] = r.getTupleStream();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index 22d7c8bc73c..3b79a3c1fa4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -17,10 +17,12 @@
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -47,9 +47,7 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* @since 6.5.0
@@ -71,7 +69,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
private transient SolrClientCache clientCache;
private transient boolean doCloseCache;
private transient StreamContext streamContext;
- private transient ExecutorService executorService;
public SignificantTermsStream(
String zkHost,
@@ -251,10 +248,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
} else {
doCloseCache = false;
}
-
- this.executorService =
- ExecutorUtil.newMDCAwareCachedThreadPool(
- new SolrNamedThreadFactory("SignificantTermsStream"));
}
@Override
@@ -262,9 +255,8 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
return null;
}
- private List<Future<NamedList<?>>> callShards(List<String> baseUrls) throws IOException {
-
- List<Future<NamedList<?>>> futures = new ArrayList<>();
+ private Collection<NamedList<?>> callShards(List<String> baseUrls) throws IOException {
+ List<SignificantTermsCall> tasks = new ArrayList<>();
for (String baseUrl : baseUrls) {
SignificantTermsCall lc =
new SignificantTermsCall(
@@ -277,12 +269,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
this.numTerms,
streamContext.isLocal(),
clientCache);
-
- Future<NamedList<?>> future = executorService.submit(lc);
- futures.add(future);
+ tasks.add(lc);
}
-
- return futures;
+ return submitAllAndAwaitAggregatingExceptions(tasks, "SignificantTermsStream");
}
@Override
@@ -290,8 +279,6 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
if (doCloseCache) {
clientCache.close();
}
-
- executorService.shutdown();
}
/** Return the stream sort - ie, the order in which records are returned */
@@ -316,9 +303,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
Map<String, int[]> mergeFreqs = new HashMap<>();
long numDocs = 0;
long resultCount = 0;
- for (Future<NamedList<?>> getTopTermsCall :
- callShards(getShards(zkHost, collection, streamContext))) {
- NamedList<?> fullResp = getTopTermsCall.get();
+ for (NamedList<?> fullResp : callShards(getShards(zkHost, collection, streamContext))) {
Map<?, ?> stResp = (Map<?, ?>) fullResp.get("significantTerms");
@SuppressWarnings({"unchecked"})
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java
new file mode 100644
index 00000000000..0df2add101f
--- /dev/null
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamExecutorHelper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+
+public class StreamExecutorHelper {
+
+ /**
+ * Takes a list of Callables and executes them returning the results as a list. The method waits
+ * for the return of every task even if one of them throws an exception. If any exception happens
+ * it will be thrown, wrapped into an IOException, and other following exceptions will be added as
+ * `addSuppressed` to the original exception
+ *
+ * @param <T> the response type
+ * @param tasks the list of callables to be executed
+ * @param threadsName name to be used by the SolrNamedThreadFactory
+ * @return results collection
+ * @throws IOException in case any exceptions happened
+ */
+ public static <T> Collection<T> submitAllAndAwaitAggregatingExceptions(
+ List<? extends Callable<T>> tasks, String threadsName) throws IOException {
+ ExecutorService service =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory(threadsName));
+ try {
+ return ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks).stream()
+ .collect(Collectors.toList());
+ } finally {
+ ExecutorUtil.shutdownNowAndAwaitTermination(service);
+ }
+ }
+}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index e3ff627232a..1461a3af70d 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.ID;
@@ -33,8 +34,6 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.io.ClassificationEvaluation;
@@ -55,9 +54,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* @since 6.2.0
@@ -85,7 +82,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
private transient boolean doCloseCache;
protected transient StreamContext streamContext;
- protected transient ExecutorService executorService;
protected TupleStream termsStream;
private List<String> terms;
@@ -367,9 +363,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
} else {
doCloseCache = false;
}
-
- this.executorService =
- ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TextLogitSolrStream"));
}
@Override
@@ -410,9 +403,8 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
}
- private List<Future<Tuple>> callShards(List<String> baseUrls) throws IOException {
-
- List<Future<Tuple>> futures = new ArrayList<>();
+ private Collection<Tuple> callShards(List<String> baseUrls) throws IOException {
+ List<LogitCall> tasks = new ArrayList<>();
for (String baseUrl : baseUrls) {
LogitCall lc =
new LogitCall(
@@ -428,12 +420,9 @@ public class TextLogitStream extends TupleStream implements Expressible {
this.threshold,
this.idfs,
this.clientCache);
-
- Future<Tuple> future = executorService.submit(lc);
- futures.add(future);
+ tasks.add(lc);
}
-
- return futures;
+ return submitAllAndAwaitAggregatingExceptions(tasks, "TextLogitSolrStream");
}
@Override
@@ -441,10 +430,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
if (doCloseCache) {
clientCache.close();
}
-
- if (executorService != null) {
- executorService.shutdown();
- }
termsStream.close();
}
@@ -511,9 +496,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
this.evaluation = new ClassificationEvaluation();
this.error = 0;
- for (Future<Tuple> logitCall : callShards(getShardUrls())) {
-
- Tuple tuple = logitCall.get();
+ for (Tuple tuple : callShards(getShardUrls())) {
@SuppressWarnings({"unchecked"})
List<Double> shardWeights = (List<Double>) tuple.get("weights");
allWeights.add(shardWeights);
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index e343d56e35c..8e3844c152a 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.io.stream;
+import static org.apache.solr.client.solrj.io.stream.StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.common.params.CommonParams.ID;
import static org.apache.solr.common.params.CommonParams.SORT;
@@ -32,11 +33,11 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
@@ -59,8 +60,6 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
/**
* @since 6.0.0
@@ -338,30 +337,15 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void openStreams() throws IOException {
-
- ExecutorService service =
- ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TopicStream"));
- try {
- List<Future<TupleWrapper>> futures = new ArrayList<>();
- for (TupleStream solrStream : solrStreams) {
- StreamOpener so = new StreamOpener((SolrStream) solrStream, comp);
- Future<TupleWrapper> future = service.submit(so);
- futures.add(future);
- }
-
- try {
- for (Future<TupleWrapper> f : futures) {
- TupleWrapper w = f.get();
- if (w != null) {
- tuples.add(w);
- }
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- } finally {
- service.shutdown();
- }
+ List<Callable<TupleWrapper>> tasks =
+ solrStreams.stream()
+ .map(s -> new StreamOpener((SolrStream) s, comp))
+ .collect(Collectors.toUnmodifiableList());
+ var results =
+ submitAllAndAwaitAggregatingExceptions(tasks, "TopicStream").stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toUnmodifiableList());
+ tuples.addAll(results);
}
@Override
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 13faf73acde..435d5984083 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -42,6 +42,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.IOUtils;
/**
* @since 5.1.0
@@ -125,6 +126,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
static List<Replica> getReplicas(
String zkHost, String collection, StreamContext streamContext, SolrParams requestParams)
throws IOException {
+ if (zkHost == null) {
+ throw new IOException(
+ String.format(
+ Locale.ROOT,
+ "invalid expression - zkHost not found for collection '%s'",
+ collection));
+ }
+
List<Replica> replicas = new ArrayList<>();
// SolrCloud Sharding
@@ -139,54 +148,49 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
localSolrClientCache = null;
}
- if (zkHost == null) {
- throw new IOException(
- String.format(
- Locale.ROOT,
- "invalid expression - zkHost not found for collection '%s'",
- collection));
- }
-
- CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
- ClusterState clusterState = cloudSolrClient.getClusterStateProvider().getClusterState();
- Slice[] slices = CloudSolrStream.getSlices(collection, cloudSolrClient, true);
- Set<String> liveNodes = clusterState.getLiveNodes();
-
- RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
- final ModifiableSolrParams solrParams;
- if (streamContext != null) {
- solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
- requestReplicaListTransformerGenerator =
- streamContext.getRequestReplicaListTransformerGenerator();
- } else {
- solrParams = new ModifiableSolrParams();
- requestReplicaListTransformerGenerator = null;
- }
- if (requestReplicaListTransformerGenerator == null) {
- requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
- }
- solrParams.add(requestParams);
-
- ReplicaListTransformer replicaListTransformer =
- requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
-
- final String coreFilter =
- streamContext != null && streamContext.isLocal()
- ? (String) streamContext.get("core")
- : null;
- List<Replica> sortedReplicas = new ArrayList<>();
- for (Slice slice : slices) {
- slice.getReplicas().stream().filter(r -> r.isActive(liveNodes)).forEach(sortedReplicas::add);
- replicaListTransformer.transform(sortedReplicas);
- sortedReplicas.stream()
- .filter(r -> coreFilter == null || coreFilter.equals(r.core))
- .findFirst()
- .ifPresent(replicas::add);
- sortedReplicas.clear();
- }
+ try {
+ CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zkHost);
+ ClusterState clusterState = cloudSolrClient.getClusterStateProvider().getClusterState();
+ Slice[] slices = CloudSolrStream.getSlices(collection, cloudSolrClient, true);
+ Set<String> liveNodes = clusterState.getLiveNodes();
+
+ RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+ final ModifiableSolrParams solrParams;
+ if (streamContext != null) {
+ solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+ requestReplicaListTransformerGenerator =
+ streamContext.getRequestReplicaListTransformerGenerator();
+ } else {
+ solrParams = new ModifiableSolrParams();
+ requestReplicaListTransformerGenerator = null;
+ }
+ if (requestReplicaListTransformerGenerator == null) {
+ requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+ }
+ solrParams.add(requestParams);
+
+ ReplicaListTransformer replicaListTransformer =
+ requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
+ final String coreFilter =
+ streamContext != null && streamContext.isLocal()
+ ? (String) streamContext.get("core")
+ : null;
+ List<Replica> sortedReplicas = new ArrayList<>();
+ for (Slice slice : slices) {
+ slice.getReplicas().stream()
+ .filter(r -> r.isActive(liveNodes))
+ .forEach(sortedReplicas::add);
+ replicaListTransformer.transform(sortedReplicas);
+ sortedReplicas.stream()
+ .filter(r -> coreFilter == null || coreFilter.equals(r.core))
+ .findFirst()
+ .ifPresent(replicas::add);
+ sortedReplicas.clear();
+ }
- if (localSolrClientCache != null) {
- localSolrClientCache.close();
+ } finally {
+ IOUtils.closeQuietly(localSolrClientCache);
}
return replicas;
diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java
new file mode 100644
index 00000000000..ae80cea1060
--- /dev/null
+++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamExecutorHelperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.solr.SolrTestCase;
+import org.junit.Test;
+
+public class StreamExecutorHelperTest extends SolrTestCase {
+
+ @Test
+ public void submitAllTest() throws IOException {
+ AtomicLong idx = new AtomicLong();
+ Callable<Long> c = () -> idx.getAndIncrement();
+
+ List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+ List<Long> results = new ArrayList<>();
+ results.addAll(StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions(tasks, "test"));
+ Collections.sort(results);
+ List<Long> expected = List.of(0l, 1l, 2l, 3l, 4l);
+ assertEquals(expected, results);
+ }
+
+ @Test
+ public void submitAllWithExceptionsTest() {
+ AtomicLong idx = new AtomicLong();
+ Callable<Long> c =
+ () -> {
+ long id = idx.getAndIncrement();
+ if (id % 2 == 0) {
+ throw new Exception("TestException" + id);
+ }
+ return id;
+ };
+ List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+ IOException ex =
+ expectThrows(
+ IOException.class,
+ () -> StreamExecutorHelper.submitAllAndAwaitAggregatingExceptions(tasks, "test"));
+ List<String> results = new ArrayList<>();
+ results.add(ex.getCause().getMessage());
+ for (var s : ex.getSuppressed()) {
+ results.add(s.getMessage());
+ }
+ Collections.sort(results);
+ List<String> expected = List.of("TestException0", "TestException2", "TestException4");
+ assertEquals(expected, results);
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index 1c8f5647e75..cfa7b176d50 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -16,13 +16,17 @@
*/
package org.apache.solr.common.util;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
@@ -30,6 +34,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -329,4 +334,48 @@ public class ExecutorUtil {
if (flag == null) isServerPool.remove();
else isServerPool.set(flag);
}
+
+ /**
+ * Takes an executor and a list of Callables and executes them returning the results as a list.
+ * The method waits for the return of every task even if one of them throws an exception. If any
+ * exception happens it will be thrown, wrapped into an IOException, and other following
+ * exceptions will be added as `addSuppressed` to the original exception
+ *
+ * @param <T> the response type
+ * @param service executor
+ * @param tasks the list of callables to be executed
+ * @return results list
+ * @throws IOException in case any exceptions happened
+ */
+ public static <T> Collection<T> submitAllAndAwaitAggregatingExceptions(
+ ExecutorService service, List<? extends Callable<T>> tasks) throws IOException {
+ List<T> results = new ArrayList<>(tasks.size());
+ IOException parentException = null;
+
+ // Could alternatively use service.invokeAll, but this way we can start looping over futures
+ // before all are done
+ List<Future<T>> futures =
+ tasks.stream().map(service::submit).collect(Collectors.toUnmodifiableList());
+ for (Future<T> f : futures) {
+ try {
+ results.add(f.get());
+ } catch (ExecutionException e) {
+ if (parentException == null) {
+ parentException = new IOException(e.getCause());
+ } else {
+ parentException.addSuppressed(e.getCause());
+ }
+ } catch (Exception e) {
+ if (parentException == null) {
+ parentException = new IOException(e);
+ } else {
+ parentException.addSuppressed(e);
+ }
+ }
+ }
+ if (parentException != null) {
+ throw parentException;
+ }
+ return results;
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
index d7cfbe9c321..c3fc2137018 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -17,10 +17,16 @@
package org.apache.solr.common.util;
import com.carrotsearch.randomizedtesting.annotations.Timeout;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.SolrTestCase;
import org.apache.solr.util.TimeOut;
import org.junit.Test;
@@ -95,4 +101,56 @@ public class ExecutorUtilTest extends SolrTestCase {
}
return true; // ran full time
}
+
+ @Test
+ public void submitAllTest() throws IOException {
+ AtomicLong idx = new AtomicLong();
+ Callable<Long> c = () -> idx.getAndIncrement();
+
+ List<Long> results = new ArrayList<>();
+ ExecutorService service =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("test"));
+ try {
+ List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+ results.addAll(ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks));
+ } finally {
+ ExecutorUtil.shutdownNowAndAwaitTermination(service);
+ }
+ Collections.sort(results);
+ List<Long> expected = List.of(0l, 1l, 2l, 3l, 4l);
+ assertEquals(expected, results);
+ }
+
+ @Test
+ public void submitAllWithExceptionsTest() {
+ AtomicLong idx = new AtomicLong();
+ Callable<Long> c =
+ () -> {
+ long id = idx.getAndIncrement();
+ if (id % 2 == 0) {
+ throw new Exception("TestException" + id);
+ }
+ return id;
+ };
+
+ ExecutorService service =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("test"));
+ try {
+ List<Callable<Long>> tasks = List.of(c, c, c, c, c);
+ IOException ex =
+ expectThrows(
+ IOException.class,
+ () -> ExecutorUtil.submitAllAndAwaitAggregatingExceptions(service, tasks));
+ List<String> results = new ArrayList<>();
+ results.add(ex.getCause().getMessage());
+ for (var s : ex.getSuppressed()) {
+ results.add(s.getMessage());
+ }
+ Collections.sort(results);
+ List<String> expected = List.of("TestException0", "TestException2", "TestException4");
+ assertEquals(expected, results);
+ } finally {
+ ExecutorUtil.shutdownNowAndAwaitTermination(service);
+ }
+ }
}