You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by st...@apache.org on 2023/08/21 12:45:31 UTC
[solr] branch main updated: SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)
This is an automated email from the ASF dual-hosted git repository.
stillalex pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new f6ef54a757c SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)
f6ef54a757c is described below
commit f6ef54a757cab2e3f4ee9785e234bff0bc5a22b0
Author: Alex D <st...@apache.org>
AuthorDate: Mon Aug 21 05:45:24 2023 -0700
SOLR-16927 Allow SolrClientCache clients to use Jetty HTTP2 clients (#1835)
---
solr/CHANGES.txt | 2 +
solr/benchmark/build.gradle | 3 +-
.../org/apache/solr/bench/MiniClusterState.java | 3 +
.../apache/solr/bench/search/StreamingSearch.java | 154 +++++++++++++++++++++
.../solr/client/solrj/io/SolrClientCache.java | 153 ++++++++++++--------
.../client/solrj/io/stream/CloudSolrStream.java | 1 -
.../solr/client/solrj/io/stream/CommitStream.java | 12 +-
.../client/solrj/io/stream/DeepRandomStream.java | 2 -
.../solr/client/solrj/io/stream/DeleteStream.java | 3 +-
.../solr/client/solrj/io/stream/Facet2DStream.java | 30 ++--
.../solr/client/solrj/io/stream/FacetStream.java | 29 ++--
.../solrj/io/stream/FeaturesSelectionStream.java | 63 +++++----
.../solr/client/solrj/io/stream/HavingStream.java | 4 +-
.../solr/client/solrj/io/stream/KnnStream.java | 23 ++-
.../solr/client/solrj/io/stream/LetStream.java | 2 +-
.../solr/client/solrj/io/stream/ModelStream.java | 3 -
.../solr/client/solrj/io/stream/RandomStream.java | 29 ++--
.../client/solrj/io/stream/ScoreNodesStream.java | 14 +-
.../solr/client/solrj/io/stream/SearchStream.java | 24 ++--
.../solrj/io/stream/SignificantTermsStream.java | 54 +++++---
.../solr/client/solrj/io/stream/SolrStream.java | 42 +++---
.../solr/client/solrj/io/stream/SqlStream.java | 2 -
.../solr/client/solrj/io/stream/StatsStream.java | 28 ++--
.../solr/client/solrj/io/stream/StreamContext.java | 3 +-
.../client/solrj/io/stream/TextLogitStream.java | 44 +++---
.../client/solrj/io/stream/TimeSeriesStream.java | 23 ++-
.../solr/client/solrj/io/stream/TopicStream.java | 103 +++++++-------
.../solr/client/solrj/io/stream/UpdateStream.java | 41 ++----
.../client/solrj/impl/CloudHttp2SolrClient.java | 8 ++
.../solr/client/solrj/impl/Http2SolrClient.java | 25 +++-
.../solr/client/solrj/impl/HttpSolrClient.java | 1 +
.../solr/client/solrj/impl/SolrClientBuilder.java | 8 ++
32 files changed, 596 insertions(+), 340 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c3985d65a8b..e09b1849394 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -101,6 +101,8 @@ Improvements
* SOLR-16940: Users can pass Java system properties to the SolrCLI via the SOLR_TOOL_OPTS environment variable. (Houston Putman)
+* SOLR-16927: Allow SolrClientCache clients to use Jetty HTTP2 clients (Alex Deparvu, David Smiley)
+
Optimizations
---------------------
diff --git a/solr/benchmark/build.gradle b/solr/benchmark/build.gradle
index be99ea3ee03..63cea7af01f 100644
--- a/solr/benchmark/build.gradle
+++ b/solr/benchmark/build.gradle
@@ -46,9 +46,10 @@ task echoCp {
dependencies {
implementation project(':solr:test-framework')
implementation project(':solr:solrj')
+ implementation project(':solr:solrj-streaming')
implementation 'org.apache.lucene:lucene-core'
-
+ implementation 'org.apache.httpcomponents:httpclient'
implementation 'commons-io:commons-io'
implementation 'io.dropwizard.metrics:metrics-core'
implementation 'org.apache.commons:commons-math3'
diff --git a/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java b/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
index a35dd07bbcc..dd1c0afe639 100755
--- a/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
+++ b/solr/benchmark/src/java/org/apache/solr/bench/MiniClusterState.java
@@ -86,6 +86,8 @@ public class MiniClusterState {
/** The Nodes. */
public List<String> nodes;
+ public String zkHost;
+
/** The Cluster. */
MiniSolrCloudCluster cluster;
@@ -277,6 +279,7 @@ public class MiniClusterState {
for (JettySolrRunner runner : jetties) {
nodes.add(runner.getBaseUrl().toString());
}
+ zkHost = cluster.getZkServer().getZkAddress();
client = new Http2SolrClient.Builder().useHttp1_1(useHttp1).build();
diff --git a/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java b/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java
new file mode 100644
index 00000000000..14046644c46
--- /dev/null
+++ b/solr/benchmark/src/java/org/apache/solr/bench/search/StreamingSearch.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.bench.search;
+
+import static org.apache.solr.bench.Docs.docs;
+import static org.apache.solr.bench.generators.SourceDSL.integers;
+import static org.apache.solr.bench.generators.SourceDSL.strings;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.solr.bench.Docs;
+import org.apache.solr.bench.MiniClusterState;
+import org.apache.solr.bench.MiniClusterState.MiniClusterBenchState;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Fork(value = 1)
+@BenchmarkMode(Mode.Throughput)
+@Warmup(time = 5, iterations = 1)
+@Measurement(time = 30, iterations = 4)
+@Threads(value = 1)
+public class StreamingSearch {
+
+ private static final String collection = "benchStreamingSearch";
+
+ @State(Scope.Benchmark)
+ public static class BenchState {
+
+ @Param({"false", "true"})
+ boolean useHttp1;
+
+ private int docs = 1000;
+ private String zkHost;
+ private ModifiableSolrParams params;
+ private StreamContext streamContext;
+ private Http2SolrClient http2SolrClient;
+
+ @Setup(Level.Trial)
+ public void setup(MiniClusterBenchState miniClusterState) throws Exception {
+
+ miniClusterState.startMiniCluster(3);
+ miniClusterState.createCollection(collection, 3, 1);
+ Docs docGen =
+ docs()
+ .field("id", integers().incrementing())
+ .field("text2_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
+ .field("text3_ts", strings().basicLatinAlphabet().multi(312).ofLengthBetween(30, 64))
+ .field("int1_i_dv", integers().all());
+ miniClusterState.index(collection, docGen, docs);
+ miniClusterState.waitForMerges(collection);
+
+ zkHost = miniClusterState.zkHost;
+
+ params = new ModifiableSolrParams();
+ params.set(CommonParams.Q, "*:*");
+ params.set(CommonParams.FL, "id,text2_ts,text3_ts,int1_i_dv");
+ params.set(CommonParams.SORT, "id asc,int1_i_dv asc");
+ params.set(CommonParams.ROWS, docs);
+ }
+
+ @Setup(Level.Iteration)
+ public void setupIteration(MiniClusterState.MiniClusterBenchState miniClusterState)
+ throws SolrServerException, IOException {
+ SolrClientCache solrClientCache;
+ if (useHttp1) {
+ var httpClient = HttpClientUtil.createClient(null); // TODO tune params?
+ solrClientCache = new SolrClientCache(httpClient);
+ } else {
+ http2SolrClient = newHttp2SolrClient();
+ solrClientCache = new SolrClientCache(http2SolrClient);
+ }
+
+ streamContext = new StreamContext();
+ streamContext.setSolrClientCache(solrClientCache);
+ }
+
+ @TearDown(Level.Iteration)
+ public void teardownIt() {
+ streamContext.getSolrClientCache().close();
+ if (http2SolrClient != null) {
+ http2SolrClient.close();
+ }
+ }
+ }
+
+ @Benchmark
+ public Object stream(
+ BenchState benchState, MiniClusterState.MiniClusterBenchState miniClusterState)
+ throws SolrServerException, IOException {
+ CloudSolrStream stream = new CloudSolrStream(benchState.zkHost, collection, benchState.params);
+ stream.setStreamContext(benchState.streamContext);
+ return getTuples(stream);
+ }
+
+ private static List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+ List<Tuple> tuples = new ArrayList<>();
+ try {
+ tupleStream.open();
+ while (true) {
+ Tuple t = tupleStream.read();
+ if (t.EOF) {
+ break;
+ } else {
+ tuples.add(t);
+ }
+ }
+ return tuples;
+ } finally {
+ tupleStream.close();
+ }
+ }
+
+ public static Http2SolrClient newHttp2SolrClient() {
+ // TODO tune params?
+ var builder = new Http2SolrClient.Builder();
+ return builder.build();
+ }
+}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index ff2963c89fb..02621784d00 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -16,10 +16,9 @@
*/
package org.apache.solr.client.solrj.io;
+import java.io.Closeable;
import java.io.IOException;
-import java.io.Serializable;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,97 +27,137 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.SolrClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * The SolrClientCache caches SolrClients so they can be reused by different TupleStreams.
- *
- * <p>TODO: Cut this over to using Solr's new Http2 clients
- */
-public class SolrClientCache implements Serializable {
+/** The SolrClientCache caches SolrClients so they can be reused by different TupleStreams. */
+public class SolrClientCache implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final Map<String, SolrClient> solrClients = new HashMap<>();
- private final HttpClient httpClient;
// Set the floor for timeouts to 60 seconds.
// Timeouts cans be increased by setting the system properties defined below.
- private static final int conTimeout =
+ private static final int MIN_TIMEOUT = 60000;
+ private static final int minConnTimeout =
Math.max(
- Integer.parseInt(System.getProperty(HttpClientUtil.PROP_CONNECTION_TIMEOUT, "60000")),
- 60000);
- private static final int socketTimeout =
- Math.max(
- Integer.parseInt(System.getProperty(HttpClientUtil.PROP_SO_TIMEOUT, "60000")), 60000);
+ Integer.getInteger(HttpClientUtil.PROP_CONNECTION_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);
+ private static final int minSocketTimeout =
+ Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT);
+
+ private final Map<String, SolrClient> solrClients = new HashMap<>();
+ private final HttpClient apacheHttpClient;
+ private final Http2SolrClient http2SolrClient;
public SolrClientCache() {
- httpClient = null;
+ this.apacheHttpClient = null;
+ this.http2SolrClient = null;
}
@Deprecated(since = "9.0")
- public SolrClientCache(HttpClient httpClient) {
- this.httpClient = httpClient;
+ public SolrClientCache(HttpClient apacheHttpClient) {
+ this.apacheHttpClient = apacheHttpClient;
+ this.http2SolrClient = null;
}
- @Deprecated(since = "9.0")
- public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
-
- // Timeouts should never be lower then 60000 but they can be set higher
- assert (conTimeout >= 60000);
- assert (socketTimeout >= 60000);
-
- if (log.isDebugEnabled()) {
- log.debug("SolrClientCache.conTimeout: {}", conTimeout);
- log.debug("SolrClientCache.socketTimeout: {}", socketTimeout);
- }
+ public SolrClientCache(Http2SolrClient http2SolrClient) {
+ this.apacheHttpClient = null;
+ this.http2SolrClient = http2SolrClient;
+ }
+ public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
- CloudSolrClient client;
if (solrClients.containsKey(zkHost)) {
- client = (CloudSolrClient) solrClients.get(zkHost);
+ return (CloudSolrClient) solrClients.get(zkHost);
+ }
+ final CloudSolrClient client;
+ if (apacheHttpClient != null) {
+ client = newCloudLegacySolrClient(zkHost, apacheHttpClient);
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- var builder =
- new CloudLegacySolrClient.Builder(hosts, Optional.empty())
- .withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
- .withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
- if (httpClient != null) {
- builder = builder.withHttpClient(httpClient);
- }
-
- client = builder.build();
- client.connect();
- solrClients.put(zkHost, client);
+ client = newCloudHttp2SolrClient(zkHost, http2SolrClient);
}
+ solrClients.put(zkHost, client);
+ return client;
+ }
+ @Deprecated
+ private static CloudSolrClient newCloudLegacySolrClient(String zkHost, HttpClient httpClient) {
+ final List<String> hosts = List.of(zkHost);
+ var builder = new CloudLegacySolrClient.Builder(hosts, Optional.empty());
+ adjustTimeouts(builder, httpClient);
+ var client = builder.build();
+ client.connect();
+ return client;
+ }
+
+ private static CloudHttp2SolrClient newCloudHttp2SolrClient(
+ String zkHost, Http2SolrClient http2SolrClient) {
+ final List<String> hosts = List.of(zkHost);
+ var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
+ // using internal builder to ensure the internal client gets closed
+ builder = builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, http2SolrClient));
+ var client = builder.build();
+ client.connect();
return client;
}
- @Deprecated(since = "9.0")
public synchronized SolrClient getHttpSolrClient(String baseUrl) {
- SolrClient client;
+ Objects.requireNonNull(baseUrl, "Url cannot be null!");
if (solrClients.containsKey(baseUrl)) {
- client = solrClients.get(baseUrl);
+ return solrClients.get(baseUrl);
+ }
+ final SolrClient client;
+ if (apacheHttpClient != null) {
+ client = newHttpSolrClient(baseUrl, apacheHttpClient);
} else {
- HttpSolrClient.Builder builder =
- new HttpSolrClient.Builder(baseUrl)
- .withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS)
- .withConnectionTimeout(conTimeout, TimeUnit.MILLISECONDS);
- if (httpClient != null) {
- builder = builder.withHttpClient(httpClient);
- }
- client = builder.build();
- solrClients.put(baseUrl, client);
+ client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build();
}
+ solrClients.put(baseUrl, client);
return client;
}
+ @Deprecated
+ private static SolrClient newHttpSolrClient(String baseUrl, HttpClient httpClient) {
+ HttpSolrClient.Builder builder = new HttpSolrClient.Builder(baseUrl);
+ adjustTimeouts(builder, httpClient);
+ return builder.build();
+ }
+
+ @Deprecated
+ private static void adjustTimeouts(SolrClientBuilder<?> builder, HttpClient httpClient) {
+ builder.withHttpClient(httpClient);
+ int socketTimeout = Math.max(minSocketTimeout, builder.getSocketTimeoutMillis());
+ builder.withSocketTimeout(socketTimeout, TimeUnit.MILLISECONDS);
+ int connTimeout = Math.max(minConnTimeout, builder.getConnectionTimeoutMillis());
+ builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ private static Http2SolrClient.Builder newHttp2SolrClientBuilder(
+ String baseUrl, Http2SolrClient http2SolrClient) {
+ var builder = new Http2SolrClient.Builder(baseUrl);
+ if (http2SolrClient != null) {
+ builder = builder.withHttpClient(http2SolrClient);
+ }
+ long idleTimeout = minSocketTimeout;
+ if (builder.getIdleTimeoutMillis() != null) {
+ idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis());
+ }
+ builder.withIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
+ long connTimeout = minConnTimeout;
+ if (builder.getConnectionTimeout() != null) {
+ connTimeout = Math.max(idleTimeout, builder.getConnectionTimeout());
+ }
+ builder.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS);
+ return builder;
+ }
+
+ @Override
public synchronized void close() {
for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
try {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 6cef1b2c9b6..8b2bd54aef7 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -75,7 +75,6 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected StreamComparator comp;
private boolean trace;
protected transient Map<String, Tuple> eofTuples;
- protected transient CloudSolrClient cloudSolrClient;
protected transient List<TupleStream> solrStreams;
protected transient TreeSet<TupleWrapper> tuples;
protected transient StreamContext streamContext;
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
index 1121ab12421..79f8abfab23 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java
@@ -55,6 +55,7 @@ public class CommitStream extends TupleStream implements Expressible {
private TupleStream tupleSource;
private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
private long docsSinceCommit;
public CommitStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -150,7 +151,12 @@ public class CommitStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
tupleSource.open();
- clientCache = new SolrClientCache();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
docsSinceCommit = 0;
}
@@ -193,7 +199,9 @@ public class CommitStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- clientCache.close();
+ if (doCloseCache) {
+ clientCache.close();
+ }
tupleSource.close();
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index 02bb701d5d3..2e32dc26f09 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -71,7 +70,6 @@ public class DeepRandomStream extends TupleStream implements Expressible {
protected StreamComparator comp;
private boolean trace;
protected transient Map<String, Tuple> eofTuples;
- protected transient CloudSolrClient cloudSolrClient;
protected transient List<TupleStream> solrStreams;
protected transient Deque<TupleWrapper> tuples;
protected transient StreamContext streamContext;
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
index e78941c80a8..d00908792ed 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/DeleteStream.java
@@ -79,7 +79,8 @@ public final class DeleteStream extends UpdateStream implements Expressible {
final Long version = getVersion(doc);
req.deleteById(id, version);
}
- req.process(getCloudSolrClient(), getCollectionName());
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
+ req.process(cloudSolrClient, getCollectionName());
} catch (SolrServerException | NumberFormatException | IOException e) {
log.warn("Unable to delete documents from collection due to unexpected error.", e);
String className = e.getClass().getName();
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
index c362257387c..878dee75af4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java
@@ -24,12 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -68,8 +64,8 @@ public class Facet2DStream extends TupleStream implements Expressible {
private int dimensionY;
private FieldComparator bucketSort;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
+ protected transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
public Facet2DStream(
String zkHost,
@@ -312,7 +308,7 @@ public class Facet2DStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- cache = context.getSolrClientCache();
+ clientCache = context.getSolrClientCache();
}
@Override
@@ -322,16 +318,11 @@ public class Facet2DStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- if (cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient =
- new CloudLegacySolrClient.Builder(hosts, Optional.empty())
- .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
- .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
- .build();
+ doCloseCache = false;
}
FieldComparator[] adjustedSorts = adjustSorts(x, y, bucketSort);
@@ -344,6 +335,7 @@ public class Facet2DStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
NamedList<Object> response = cloudSolrClient.request(request, collection);
getTuples(response, x, y, metric);
this.out = tuples.iterator();
@@ -366,10 +358,8 @@ public class Facet2DStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (cache == null) {
- if (cloudSolrClient != null) {
- cloudSolrClient.close();
- }
+ if (doCloseCache) {
+ clientCache.close();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 3b0b785cbc9..759c038d7a0 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -24,11 +24,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -88,8 +85,8 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
private boolean resortNeeded;
private boolean serializeBucketSizeLimit;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
+ protected transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
protected transient TupleStream parallelizedStream;
protected transient StreamContext context;
@@ -647,7 +644,7 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
@Override
public void setStreamContext(StreamContext context) {
this.context = context;
- cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
}
@Override
@@ -657,17 +654,13 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
@Override
public void open() throws IOException {
- if (cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient =
- new CloudLegacySolrClient.Builder(hosts, Optional.empty())
- .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
- .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
- .build();
+ doCloseCache = false;
}
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
// Parallelize the facet expression across multiple collections for an alias using plist if
// possible
@@ -761,10 +754,8 @@ public class FacetStream extends TupleStream implements Expressible, ParallelMet
@Override
public void close() throws IOException {
- if (cache == null) {
- if (cloudSolrClient != null) {
- cloudSolrClient.close();
- }
+ if (doCloseCache) {
+ clientCache.close();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index d4990800cd4..3a761ceaee4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -76,12 +75,9 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
protected int positiveLabel;
protected int numTerms;
- protected transient SolrClientCache cache;
- protected transient boolean isCloseCache;
- protected transient CloudSolrClient cloudSolrClient;
-
- protected transient StreamContext streamContext;
- protected ExecutorService executorService;
+ protected transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+ protected transient ExecutorService executorService;
public FeaturesSelectionStream(
String zkHost,
@@ -254,21 +250,19 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
@Override
public void setStreamContext(StreamContext context) {
- this.cache = context.getSolrClientCache();
- this.streamContext = context;
+ this.clientCache = context.getSolrClientCache();
}
/** Opens the CloudSolrStream */
@Override
public void open() throws IOException {
- if (cache == null) {
- isCloseCache = true;
- cache = new SolrClientCache();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- isCloseCache = false;
+ doCloseCache = false;
}
- this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
this.executorService =
ExecutorUtil.newMDCAwareCachedThreadPool(
new SolrNamedThreadFactory("FeaturesSelectionStream"));
@@ -281,6 +275,7 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
private List<String> getShardUrls() throws IOException {
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -314,7 +309,14 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
List<Future<NamedList<?>>> futures = new ArrayList<>();
for (String baseUrl : baseUrls) {
FeaturesSelectionCall lc =
- new FeaturesSelectionCall(baseUrl, this.params, this.field, this.outcome);
+ new FeaturesSelectionCall(
+ baseUrl,
+ this.params,
+ this.field,
+ this.outcome,
+ this.positiveLabel,
+ this.numTerms,
+ this.clientCache);
Future<NamedList<?>> future = executorService.submit(lc);
futures.add(future);
@@ -325,8 +327,8 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
@Override
public void close() throws IOException {
- if (isCloseCache && cache != null) {
- cache.close();
+ if (doCloseCache) {
+ clientCache.close();
}
if (executorService != null) {
@@ -417,26 +419,37 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible
return result;
}
- protected class FeaturesSelectionCall implements Callable<NamedList<?>> {
+ protected static class FeaturesSelectionCall implements Callable<NamedList<?>> {
- private String baseUrl;
- private String outcome;
- private String field;
- private Map<String, String> paramsMap;
+ private final String baseUrl;
+ private final String outcome;
+ private final String field;
+ private final Map<String, String> paramsMap;
+ private final int positiveLabel;
+ private final int numTerms;
+ private final SolrClientCache clientCache;
public FeaturesSelectionCall(
- String baseUrl, Map<String, String> paramsMap, String field, String outcome) {
-
+ String baseUrl,
+ Map<String, String> paramsMap,
+ String field,
+ String outcome,
+ int positiveLabel,
+ int numTerms,
+ SolrClientCache clientCache) {
this.baseUrl = baseUrl;
this.outcome = outcome;
this.field = field;
this.paramsMap = paramsMap;
+ this.positiveLabel = positiveLabel;
+ this.numTerms = numTerms;
+ this.clientCache = clientCache;
}
@Override
public NamedList<?> call() throws Exception {
ModifiableSolrParams params = new ModifiableSolrParams();
- SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+ SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
params.add(DISTRIB, "false");
params.add("fq", "{!igain}");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
index e151aaa19f7..988a3dbc73e 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/HavingStream.java
@@ -44,9 +44,7 @@ public class HavingStream extends TupleStream implements Expressible {
private TupleStream stream;
private RecursiveBooleanEvaluator evaluator;
- private StreamContext streamContext;
-
- private transient Tuple currentGroupHead;
+ private transient StreamContext streamContext;
public HavingStream(TupleStream stream, RecursiveBooleanEvaluator evaluator) throws IOException {
init(stream, evaluator);
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
index fb1d2ba6e63..41cd08d18bf 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java
@@ -29,7 +29,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -60,8 +59,8 @@ public class KnnStream extends TupleStream implements Expressible {
private String zkHost;
private Map<String, String> props;
private String collection;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
private Iterator<SolrDocument> documentIterator;
private String id;
@@ -193,7 +192,7 @@ public class KnnStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- cache = context.getSolrClientCache();
+ clientCache = context.getSolrClientCache();
}
@Override
@@ -204,7 +203,13 @@ public class KnnStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
+
ModifiableSolrParams params = getParams(this.props);
StringBuilder builder = new StringBuilder();
@@ -227,7 +232,7 @@ public class KnnStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(params);
try {
- QueryResponse response = request.process(cloudSolrClient, collection);
+ QueryResponse response = request.process(clientCache.getCloudSolrClient(zkHost), collection);
SolrDocumentList docs = response.getResults();
documentIterator = docs.iterator();
} catch (Exception e) {
@@ -236,7 +241,11 @@ public class KnnStream extends TupleStream implements Expressible {
}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ if (doCloseCache) {
+ clientCache.close();
+ }
+ }
@Override
public Tuple read() throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
index e5a6c4113ab..9576cf9658e 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
@@ -44,7 +44,7 @@ public class LetStream extends TupleStream implements Expressible {
private static final long serialVersionUID = 1;
private TupleStream stream;
- private StreamContext streamContext;
+ private transient StreamContext streamContext;
private Map<String, Object> letParams = new LinkedHashMap<>();
public LetStream(StreamExpression expression, StreamFactory factory) throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
index 727df5560a5..096f4a55e9c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.solr.client.solrj.io.ModelCache;
-import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -53,7 +52,6 @@ public class ModelStream extends TupleStream implements Expressible {
protected String collection;
protected String modelID;
protected ModelCache modelCache;
- protected SolrClientCache solrClientCache;
protected Tuple model;
protected long cacheMillis;
@@ -162,7 +160,6 @@ public class ModelStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- this.solrClientCache = context.getSolrClientCache();
this.modelCache = context.getModelCache();
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index aa0087bf8b0..c802fd870ee 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -27,13 +27,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -64,12 +60,13 @@ public class RandomStream extends TupleStream implements Expressible {
private String zkHost;
private Map<String, String> props;
private String collection;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
private Iterator<SolrDocument> documentIterator;
private int x;
private boolean outputX;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
public RandomStream() {
// Used by the RandomFacade
}
@@ -200,7 +197,7 @@ public class RandomStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- cache = context.getSolrClientCache();
+ clientCache = context.getSolrClientCache();
}
@Override
@@ -211,16 +208,11 @@ public class RandomStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- if (cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient =
- new CloudLegacySolrClient.Builder(hosts, Optional.empty())
- .withSocketTimeout(30000, TimeUnit.MILLISECONDS)
- .withConnectionTimeout(15000, TimeUnit.MILLISECONDS)
- .build();
+ doCloseCache = false;
}
ModifiableSolrParams params = getParams(this.props);
@@ -235,6 +227,7 @@ public class RandomStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(params, SolrRequest.METHOD.POST);
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
QueryResponse response = request.process(cloudSolrClient, collection);
SolrDocumentList docs = response.getResults();
documentIterator = docs.iterator();
@@ -245,8 +238,8 @@ public class RandomStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (cache == null) {
- cloudSolrClient.close();
+ if (doCloseCache) {
+ clientCache.close();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
index ea10185f730..01be8e309c4 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ScoreNodesStream.java
@@ -59,7 +59,6 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
protected String zkHost;
private TupleStream stream;
- private transient SolrClientCache clientCache;
private Map<String, Tuple> nodes = new HashMap<>();
private Iterator<Tuple> tuples;
private String termFreq;
@@ -68,6 +67,9 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
private String bucket;
private String facetCollection;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
public ScoreNodesStream(TupleStream tupleStream, String nodeFreqField) throws IOException {
init(tupleStream, nodeFreqField);
}
@@ -176,6 +178,13 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
+
stream.open();
Tuple node = null;
StringBuilder builder = new StringBuilder();
@@ -261,6 +270,9 @@ public class ScoreNodesStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
+ if (doCloseCache) {
+ clientCache.close();
+ }
stream.close();
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
index 1e6226b7536..f1e6c33c2fb 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SearchStream.java
@@ -24,10 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
-import java.util.Optional;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@@ -55,11 +52,12 @@ public class SearchStream extends TupleStream implements Expressible {
private String zkHost;
private ModifiableSolrParams params;
private String collection;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
private Iterator<SolrDocument> documentIterator;
protected StreamComparator comp;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
public SearchStream() {}
public SearchStream(StreamExpression expression, StreamFactory factory) throws IOException {
@@ -184,7 +182,7 @@ public class SearchStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- cache = context.getSolrClientCache();
+ clientCache = context.getSolrClientCache();
}
@Override
@@ -195,16 +193,16 @@ public class SearchStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- if (cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
+ doCloseCache = false;
}
QueryRequest request = new QueryRequest(params, SolrRequest.METHOD.POST);
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
QueryResponse response = request.process(cloudSolrClient, collection);
SolrDocumentList docs = response.getResults();
documentIterator = docs.iterator();
@@ -215,8 +213,8 @@ public class SearchStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (cache == null) {
- cloudSolrClient.close();
+ if (doCloseCache) {
+ clientCache.close();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
index 7e47003fa63..22d7c8bc73c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java
@@ -68,10 +68,10 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
protected float maxDocFreq;
protected int minTermLength;
- protected transient SolrClientCache cache;
- protected transient boolean isCloseCache;
- protected transient StreamContext streamContext;
- protected ExecutorService executorService;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+ private transient StreamContext streamContext;
+ private transient ExecutorService executorService;
public SignificantTermsStream(
String zkHost,
@@ -239,17 +239,17 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- this.cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
this.streamContext = context;
}
@Override
public void open() throws IOException {
- if (cache == null) {
- isCloseCache = true;
- cache = new SolrClientCache();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- isCloseCache = false;
+ doCloseCache = false;
}
this.executorService =
@@ -274,7 +274,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
this.minDocFreq,
this.maxDocFreq,
this.minTermLength,
- this.numTerms);
+ this.numTerms,
+ streamContext.isLocal(),
+ clientCache);
Future<NamedList<?>> future = executorService.submit(lc);
futures.add(future);
@@ -285,8 +287,8 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (isCloseCache) {
- cache.close();
+ if (doCloseCache) {
+ clientCache.close();
}
executorService.shutdown();
@@ -387,15 +389,17 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
}
}
- protected class SignificantTermsCall implements Callable<NamedList<?>> {
+ protected static class SignificantTermsCall implements Callable<NamedList<?>> {
- private String baseUrl;
- private String field;
- private float minDocFreq;
- private float maxDocFreq;
- private int numTerms;
- private int minTermLength;
- private Map<String, String> paramsMap;
+ private final String baseUrl;
+ private final String field;
+ private final float minDocFreq;
+ private final float maxDocFreq;
+ private final int numTerms;
+ private final int minTermLength;
+ private final Map<String, String> paramsMap;
+ private final boolean isLocal;
+ private final SolrClientCache clientCache;
public SignificantTermsCall(
String baseUrl,
@@ -404,7 +408,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
float minDocFreq,
float maxDocFreq,
int minTermLength,
- int numTerms) {
+ int numTerms,
+ boolean isLocal,
+ SolrClientCache clientCache) {
this.baseUrl = baseUrl;
this.field = field;
@@ -413,12 +419,14 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
this.paramsMap = paramsMap;
this.numTerms = numTerms;
this.minTermLength = minTermLength;
+ this.isLocal = isLocal;
+ this.clientCache = clientCache;
}
@Override
public NamedList<?> call() throws Exception {
ModifiableSolrParams params = new ModifiableSolrParams();
- SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+ SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
params.add(DISTRIB, "false");
params.add("fq", "{!significantTerms}");
@@ -432,7 +440,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible {
params.add("minTermLength", Integer.toString(minTermLength));
params.add("field", field);
params.add("numTerms", String.valueOf(numTerms * 5));
- if (streamContext.isLocal()) {
+ if (isLocal) {
params.add("distrib", "false");
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 8f019c36e61..f58546cb432 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.client.solrj.io.stream;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -25,10 +26,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -47,8 +46,6 @@ import org.apache.solr.common.util.NamedList;
/**
* Queries a single Solr instance and maps SolrDocs to a Stream of Tuples.
*
- * <p>TODO: Move this to Http2SolrClient
- *
* @since 5.1.0
*/
public class SolrStream extends TupleStream {
@@ -62,16 +59,17 @@ public class SolrStream extends TupleStream {
private boolean trace;
private Map<String, String> fieldMappings;
private transient TupleStreamParser tupleStreamParser;
- private transient SolrClient client;
- private transient SolrClientCache cache;
private String slice;
private long checkpoint = -1;
- private CloseableHttpResponse closeableHttpResponse;
+ private Closeable closeableHttpResponse;
private boolean distrib = true;
private String user;
private String password;
private String core;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
/**
* @param baseUrl Base URL of the stream.
* @param params Map<String, String> of parameters
@@ -104,7 +102,7 @@ public class SolrStream extends TupleStream {
this.distrib = !context.isLocal();
this.numWorkers = context.numWorkers;
this.workerID = context.workerID;
- this.cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
}
public void setCredentials(String user, String password) {
@@ -115,12 +113,11 @@ public class SolrStream extends TupleStream {
/** Opens the stream to a single Solr instance. */
@Override
public void open() throws IOException {
-
- // Reuse the same client per node vs. having one per replica
- if (cache == null) {
- client = new HttpSolrClient.Builder(baseUrl).build();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- client = cache.getHttpSolrClient(baseUrl);
+ doCloseCache = false;
}
try {
@@ -193,8 +190,8 @@ public class SolrStream extends TupleStream {
if (closeableHttpResponse != null) {
closeableHttpResponse.close();
}
- if (cache == null && client != null) {
- client.close();
+ if (doCloseCache) {
+ clientCache.close();
}
}
@@ -303,15 +300,26 @@ public class SolrStream extends TupleStream {
query.setBasicAuthCredentials(user, password);
}
+ var client = clientCache.getHttpSolrClient(baseUrl);
NamedList<Object> genericResponse = client.request(query);
InputStream stream = (InputStream) genericResponse.get("stream");
+
CloseableHttpResponse httpResponse =
(CloseableHttpResponse) genericResponse.get("closeableResponse");
+ // still attempting to read http status from http response for backwards compatibility reasons
+ // since 9.4 the updated format will have a dedicated status field
+ final int statusCode;
+ if (httpResponse != null) {
+ statusCode = httpResponse.getStatusLine().getStatusCode();
+ } else {
+ statusCode = (int) genericResponse.get("responseStatus");
+ }
- final int statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode != 200) {
String errMsg = consumeStreamAsErrorMessage(stream);
- httpResponse.close();
+ if (httpResponse != null) {
+ httpResponse.close();
+ }
throw new IOException(
"Query to '"
+ query.getPath()
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
index a0419567d1d..b39ab1dbdc1 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java
@@ -24,7 +24,6 @@ import java.util.Locale;
import java.util.Map.Entry;
import java.util.Random;
import java.util.stream.Collectors;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@@ -49,7 +48,6 @@ public class SqlStream extends TupleStream implements Expressible {
protected String zkHost;
protected String collection;
protected SolrParams params;
- protected transient CloudSolrClient cloudSolrClient;
protected transient TupleStream tupleStream;
protected transient StreamContext streamContext;
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index fb7696c8ac9..507dc3820fd 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -30,7 +30,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -69,9 +68,10 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
private String zkHost;
private SolrParams params;
private String collection;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
- private StreamContext context;
+
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+ private transient StreamContext context;
protected transient TupleStream parallelizedStream;
public StatsStream(String zkHost, String collection, SolrParams params, Metric[] metrics)
@@ -216,7 +216,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
@Override
public void setStreamContext(StreamContext context) {
this.context = context;
- cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
}
@Override
@@ -226,6 +226,12 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
@Override
public void open() throws IOException {
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
@SuppressWarnings({"unchecked"})
Map<String, List<String>> shardsMap = (Map<String, List<String>>) context.get("shards");
@@ -233,7 +239,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
// Parallelize the stats stream across multiple collections for an alias using plist if possible
if (shardsMap == null && params.getBool(TIERED_PARAM, defaultTieredEnabled)) {
ClusterStateProvider clusterStateProvider =
- cache.getCloudSolrClient(zkHost).getClusterStateProvider();
+ clientCache.getCloudSolrClient(zkHost).getClusterStateProvider();
final List<String> resolved =
clusterStateProvider != null ? clusterStateProvider.resolveAlias(collection) : null;
if (resolved != null && resolved.size() > 1) {
@@ -255,7 +261,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
if (shardsMap == null) {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
try {
NamedList<?> response = cloudSolrClient.request(request, collection);
getTuples(response, metrics);
@@ -264,7 +270,7 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
}
} else {
List<String> shards = shardsMap.get(collection);
- SolrClient client = cache.getHttpSolrClient(shards.get(0));
+ SolrClient client = clientCache.getHttpSolrClient(shards.get(0));
if (shards.size() > 1) {
String shardsParam = getShardString(shards);
@@ -294,7 +300,11 @@ public class StatsStream extends TupleStream implements Expressible, ParallelMet
}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ if (doCloseCache) {
+ clientCache.close();
+ }
+ }
@Override
public Tuple read() throws IOException {
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 5feb15e46fa..2301b3d2136 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.client.solrj.io.stream;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -33,7 +32,7 @@ import org.apache.solr.common.params.SolrParams;
* <p>Note: The StreamContext contains the SolrClientCache which is used to cache SolrClients for
* reuse across multiple TupleStreams.
*/
-public class StreamContext implements Serializable {
+public class StreamContext {
private Map<String, Object> entries = new HashMap<>();
private Map<String, String> tupleContext = new HashMap<>();
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index 1f746ef35f8..e3ff627232a 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.ClassificationEvaluation;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
@@ -82,12 +81,11 @@ public class TextLogitStream extends TupleStream implements Expressible {
protected List<Double> idfs;
protected ClassificationEvaluation evaluation;
- protected transient SolrClientCache cache;
- protected transient boolean isCloseCache;
- protected transient CloudSolrClient cloudSolrClient;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
protected transient StreamContext streamContext;
- protected ExecutorService executorService;
+ protected transient ExecutorService executorService;
protected TupleStream termsStream;
private List<String> terms;
@@ -355,7 +353,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- this.cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
this.streamContext = context;
this.termsStream.setStreamContext(context);
}
@@ -363,14 +361,13 @@ public class TextLogitStream extends TupleStream implements Expressible {
/** Opens the CloudSolrStream */
@Override
public void open() throws IOException {
- if (cache == null) {
- isCloseCache = true;
- cache = new SolrClientCache();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- isCloseCache = false;
+ doCloseCache = false;
}
- this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
this.executorService =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("TextLogitSolrStream"));
}
@@ -384,6 +381,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
protected List<String> getShardUrls() throws IOException {
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -426,7 +424,10 @@ public class TextLogitStream extends TupleStream implements Expressible {
this.outcome,
this.positiveLabel,
this.learningRate,
- this.iteration);
+ this.iteration,
+ this.threshold,
+ this.idfs,
+ this.clientCache);
Future<Tuple> future = executorService.submit(lc);
futures.add(future);
@@ -437,8 +438,8 @@ public class TextLogitStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (isCloseCache && cache != null) {
- cache.close();
+ if (doCloseCache) {
+ clientCache.close();
}
if (executorService != null) {
@@ -642,7 +643,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
}
- protected class LogitCall implements Callable<Tuple> {
+ protected static class LogitCall implements Callable<Tuple> {
private String baseUrl;
private String feature;
@@ -653,6 +654,9 @@ public class TextLogitStream extends TupleStream implements Expressible {
private int positiveLabel;
private double learningRate;
private Map<String, String> paramsMap;
+ private double threshold;
+ private List<Double> idfs;
+ private SolrClientCache clientCache;
public LogitCall(
String baseUrl,
@@ -663,7 +667,10 @@ public class TextLogitStream extends TupleStream implements Expressible {
String outcome,
int positiveLabel,
double learningRate,
- int iteration) {
+ int iteration,
+ double threshold,
+ List<Double> idfs,
+ SolrClientCache clientCache) {
this.baseUrl = baseUrl;
this.feature = feature;
@@ -674,12 +681,15 @@ public class TextLogitStream extends TupleStream implements Expressible {
this.positiveLabel = positiveLabel;
this.learningRate = learningRate;
this.paramsMap = paramsMap;
+ this.threshold = threshold;
+ this.idfs = idfs;
+ this.clientCache = clientCache;
}
@Override
public Tuple call() throws Exception {
ModifiableSolrParams params = new ModifiableSolrParams();
- SolrClient solrClient = cache.getHttpSolrClient(baseUrl);
+ SolrClient solrClient = clientCache.getHttpSolrClient(baseUrl);
params.add(DISTRIB, "false");
params.add("fq", "{!tlogit}");
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
index 683cc35119c..383389d2551 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java
@@ -25,11 +25,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -71,8 +68,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
private String zkHost;
private SolrParams params;
private String collection;
- protected transient SolrClientCache cache;
- protected transient CloudSolrClient cloudSolrClient;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
public TimeSeriesStream(
String zkHost,
@@ -338,7 +335,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- cache = context.getSolrClientCache();
+ clientCache = context.getSolrClientCache();
}
@Override
@@ -348,12 +345,11 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- if (cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
} else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
+ doCloseCache = false;
}
String json = getJsonFacetString(field, metrics, start, end, gap);
@@ -364,6 +360,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
QueryRequest request = new QueryRequest(paramsLoc, SolrRequest.METHOD.POST);
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
NamedList<?> response = cloudSolrClient.request(request, collection);
getTuples(response, field, metrics);
} catch (Exception e) {
@@ -373,8 +370,8 @@ public class TimeSeriesStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (cache == null) {
- cloudSolrClient.close();
+ if (doCloseCache) {
+ clientCache.close();
}
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index 54dc364fcf3..e343d56e35c 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
@@ -40,7 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@@ -79,6 +78,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private String checkpointCollection;
private long initialCheckpoint = -1;
+ private transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
public TopicStream(
String zkHost,
String checkpointCollection,
@@ -310,6 +312,12 @@ public class TopicStream extends CloudSolrStream implements Expressible {
@Override
public void open() throws IOException {
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
this.tuples = new TreeSet<>();
this.solrStreams = new ArrayList<>();
this.eofTuples = Collections.synchronizedMap(new HashMap<>());
@@ -318,16 +326,6 @@ public class TopicStream extends CloudSolrStream implements Expressible {
// Each worker must maintain its own checkpoints
this.id = this.id + "_" + streamContext.workerID;
}
-
- if (streamContext.getSolrClientCache() != null) {
- cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
- } else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
- this.cloudSolrClient.connect();
- }
-
if (checkpoints.size() == 0) {
getPersistedCheckpoints();
if (checkpoints.size() == 0) {
@@ -384,8 +382,8 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
}
- if (streamContext != null && streamContext.getSolrClientCache() == null) {
- cloudSolrClient.close();
+ if (doCloseCache) {
+ clientCache.close();
}
}
}
@@ -426,6 +424,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getCheckpoints() throws IOException {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
this.checkpoints = new HashMap<>();
Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -483,9 +482,11 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private void persistCheckpoints() throws IOException {
- if (cloudSolrClient == null) {
+ if (clientCache == null) {
return;
}
+
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
UpdateRequest request = new UpdateRequest();
request.setParam("collection", checkpointCollection);
SolrInputDocument doc = new SolrInputDocument();
@@ -504,6 +505,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getPersistedCheckpoints() throws IOException {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, cloudSolrClient, false);
Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
@@ -537,47 +539,50 @@ public class TopicStream extends CloudSolrStream implements Expressible {
@Override
protected void constructStreams() throws IOException {
- try {
- Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
+ Slice[] slices = CloudSolrStream.getSlices(this.collection, cloudSolrClient, false);
- ModifiableSolrParams mParams = new ModifiableSolrParams(params);
- mParams.set(DISTRIB, "false"); // We are the aggregator.
- String fl = mParams.get("fl");
- mParams.set(SORT, "_version_ asc");
- if (!fl.contains(VERSION_FIELD)) {
- fl += ",_version_";
- }
- mParams.set("fl", fl);
+ ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+ mParams.set(DISTRIB, "false"); // We are the aggregator.
+ String fl = mParams.get("fl");
+ mParams.set(SORT, "_version_ asc");
+ if (!fl.contains(VERSION_FIELD)) {
+ fl += ",_version_";
+ }
+ mParams.set("fl", fl);
- Random random = new Random();
+ Random random = new Random();
- Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
+ Set<String> liveNodes = cloudSolrClient.getClusterState().getLiveNodes();
- for (Slice slice : slices) {
- ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
- long checkpoint = checkpoints.get(slice.getName());
+ for (Slice slice : slices) {
+ ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
+ long checkpoint = checkpoints.get(slice.getName());
- Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList<>();
- for (Replica replica : replicas) {
- if (replica.getState() == Replica.State.ACTIVE
- && liveNodes.contains(replica.getNodeName())) shuffler.add(replica);
- }
+ Collection<Replica> replicas = slice.getReplicas();
+ List<Replica> shuffler = new ArrayList<>();
+ for (Replica replica : replicas) {
+ if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
+ shuffler.add(replica);
+ }
- Replica rep = shuffler.get(random.nextInt(shuffler.size()));
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
- String url = zkProps.getCoreUrl();
- SolrStream solrStream = new SolrStream(url, localParams);
- solrStream.setSlice(slice.getName());
- solrStream.setCheckpoint(checkpoint);
- solrStream.setTrace(true);
- if (streamContext != null) {
- solrStream.setStreamContext(streamContext);
- }
- solrStreams.add(solrStream);
+ Replica rep = shuffler.get(random.nextInt(shuffler.size()));
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
+ String url = zkProps.getCoreUrl();
+ SolrStream solrStream = new SolrStream(url, localParams);
+ solrStream.setSlice(slice.getName());
+ solrStream.setCheckpoint(checkpoint);
+ solrStream.setTrace(true);
+ if (streamContext != null) {
+ solrStream.setStreamContext(streamContext);
}
- } catch (Exception e) {
- throw new IOException(e);
+ solrStreams.add(solrStream);
}
}
+
+ @Override
+ public void setStreamContext(StreamContext context) {
+ super.setStreamContext(context);
+ this.clientCache = context.getSolrClientCache();
+ }
}
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
index 03188e24005..adf1640f671 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java
@@ -21,10 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import java.util.Optional;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -52,7 +49,7 @@ public class UpdateStream extends TupleStream implements Expressible {
// field name in summary tuple for #docs updated in batch
public static String BATCH_INDEXED_FIELD_NAME = "batchIndexed";
private String collection;
- private String zkHost;
+ protected String zkHost;
private int updateBatchSize;
/**
* Indicates if the {@link CommonParams#VERSION_FIELD} should be removed from tuples when
@@ -64,11 +61,12 @@ public class UpdateStream extends TupleStream implements Expressible {
private int batchNumber;
private long totalDocsIndex;
private PushBackStream tupleSource;
- private transient SolrClientCache cache;
- private transient CloudSolrClient cloudSolrClient;
private List<SolrInputDocument> documentBatch = new ArrayList<>();
private String coreName;
+ protected transient SolrClientCache clientCache;
+ private transient boolean doCloseCache;
+
public UpdateStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
verifyCollectionName(collectionName, expression);
@@ -122,7 +120,12 @@ public class UpdateStream extends TupleStream implements Expressible {
@Override
public void open() throws IOException {
- setCloudSolrClient();
+ if (clientCache == null) {
+ doCloseCache = true;
+ clientCache = new SolrClientCache();
+ } else {
+ doCloseCache = false;
+ }
tupleSource.open();
}
@@ -153,8 +156,8 @@ public class UpdateStream extends TupleStream implements Expressible {
@Override
public void close() throws IOException {
- if (cache == null && cloudSolrClient != null) {
- cloudSolrClient.close();
+ if (doCloseCache) {
+ clientCache.close();
}
tupleSource.close();
}
@@ -226,7 +229,7 @@ public class UpdateStream extends TupleStream implements Expressible {
@Override
public void setStreamContext(StreamContext context) {
- this.cache = context.getSolrClientCache();
+ this.clientCache = context.getSolrClientCache();
this.coreName = (String) context.get("core");
this.tupleSource.setStreamContext(context);
}
@@ -314,23 +317,6 @@ public class UpdateStream extends TupleStream implements Expressible {
return true;
}
- /** Only viable after calling {@link #open} */
- protected CloudSolrClient getCloudSolrClient() {
- assert null != this.cloudSolrClient;
- return this.cloudSolrClient;
- }
-
- private void setCloudSolrClient() {
- if (this.cache != null) {
- this.cloudSolrClient = this.cache.getCloudSolrClient(zkHost);
- } else {
- final List<String> hosts = new ArrayList<>();
- hosts.add(zkHost);
- this.cloudSolrClient = new CloudLegacySolrClient.Builder(hosts, Optional.empty()).build();
- this.cloudSolrClient.connect();
- }
- }
-
private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) {
SolrInputDocument doc = new SolrInputDocument();
for (String field : tuple.getFields().keySet()) {
@@ -365,6 +351,7 @@ public class UpdateStream extends TupleStream implements Expressible {
}
try {
+ var cloudSolrClient = clientCache.getCloudSolrClient(zkHost);
cloudSolrClient.add(collection, documentBatch);
} catch (SolrServerException | IOException e) {
// TODO: it would be nice if there was an option to "skipFailedBatches"
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index 6311acf1b29..260d9a059c4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -342,6 +342,14 @@ public class CloudHttp2SolrClient extends CloudSolrClient {
return this;
}
+ /**
+ * Set the internal http client.
+ *
+ * <p>Note: closing the httpClient instance is at the responsibility of the caller.
+ *
+ * @param httpClient http client
+ * @return this
+ */
public Builder withHttpClient(Http2SolrClient httpClient) {
if (this.internalClientBuilder != null) {
throw new IllegalStateException(
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 25513a74dd4..63fc96f2b77 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -170,11 +170,7 @@ public class Http2SolrClient extends SolrClient {
this.serverBaseUrl = null;
}
- if (builder.idleTimeoutMillis != null && builder.idleTimeoutMillis > 0) {
- this.idleTimeoutMillis = builder.idleTimeoutMillis;
- } else {
- this.idleTimeoutMillis = HttpClientUtil.DEFAULT_SO_TIMEOUT;
- }
+ this.idleTimeoutMillis = builder.idleTimeoutMillis;
if (builder.httpClient != null) {
this.httpClient = builder.httpClient;
@@ -283,8 +279,7 @@ public class Http2SolrClient extends SolrClient {
this.authenticationStore = new AuthenticationStoreHolder();
httpClient.setAuthenticationStore(this.authenticationStore);
- if (builder.connectionTimeoutMillis != null)
- httpClient.setConnectTimeout(builder.connectionTimeoutMillis);
+ httpClient.setConnectTimeout(builder.connectionTimeoutMillis);
setupProxy(builder, httpClient);
@@ -859,6 +854,7 @@ public class Http2SolrClient extends SolrClient {
// no processor specified, return raw stream
NamedList<Object> rsp = new NamedList<>();
rsp.add("stream", is);
+ rsp.add("responseStatus", httpStatus);
// Only case where stream should not be closed
shouldClose = false;
return rsp;
@@ -1060,6 +1056,13 @@ public class Http2SolrClient extends SolrClient {
}
public Http2SolrClient build() {
+ if (idleTimeoutMillis == null || idleTimeoutMillis <= 0) {
+ idleTimeoutMillis = (long) HttpClientUtil.DEFAULT_SO_TIMEOUT;
+ }
+ if (connectionTimeoutMillis == null) {
+ connectionTimeoutMillis = (long) HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
+ }
+
Http2SolrClient client = new Http2SolrClient(baseSolrUrl, this);
try {
httpClientBuilderSetup(client);
@@ -1222,6 +1225,10 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Long getIdleTimeoutMillis() {
+ return idleTimeoutMillis;
+ }
+
public Builder useHttp1_1(boolean useHttp1_1) {
this.useHttp1_1 = useHttp1_1;
return this;
@@ -1241,6 +1248,10 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Long getConnectionTimeout() {
+ return connectionTimeoutMillis;
+ }
+
/**
* Set a timeout in milliseconds for requests issued by this client.
*
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 5f7b8a17ee8..c03fbc9f3f9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -643,6 +643,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
NamedList<Object> rsp = new NamedList<>();
rsp.add("stream", respBody);
rsp.add("closeableResponse", response);
+ rsp.add("responseStatus", response.getStatusLine().getStatusCode());
// Only case where stream should not be closed
shouldClose = false;
return rsp;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
index 645a19040de..d0c97f8ed22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
@@ -127,6 +127,10 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
return getThis();
}
+ public int getConnectionTimeoutMillis() {
+ return this.connectionTimeoutMillis;
+ }
+
/**
* Tells {@link Builder} that created clients should set the following read timeout on all
* sockets.
@@ -155,4 +159,8 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
socketTimeoutMillisUpdate = true;
return getThis();
}
+
+ public int getSocketTimeoutMillis() {
+ return this.socketTimeoutMillis;
+ }
}