You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ep...@apache.org on 2024/02/23 18:52:17 UTC
(solr) branch branch_9x updated: SOLR-16138: Throw when streaming and all cores are down (#784)
This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 8aaa68fb4d4 SOLR-16138: Throw when streaming and all cores are down (#784)
8aaa68fb4d4 is described below
commit 8aaa68fb4d4ffc46611c489faf044cdd1931cf30
Author: Antoine Bursaux <bu...@gmail.com>
AuthorDate: Fri Feb 23 19:50:56 2024 +0100
SOLR-16138: Throw when streaming and all cores are down (#784)
Co-authored-by: Eric Pugh <ep...@opensourceconnections.com>
---
solr/CHANGES.txt | 2 +
.../client/solrj/io/stream/CloudSolrStream.java | 16 ++-
.../client/solrj/io/stream/BadClusterTest.java | 141 +++++++++++++++++++++
3 files changed, 154 insertions(+), 5 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3cea0081583..5792622c3d1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -31,6 +31,8 @@ Improvements
* SOLR-17058: Add 'distrib.statsCache' parameter to disable distributed stats requests at query time. (Wei Wang, Mikhail Khludnev)
+* SOLR-16138: Throw a exception when issuing a streaming expression and all cores are down instead of returning 0 documents. (Antoine Bursaux via Eric Pugh)
+
Optimizations
---------------------
* SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index f75dec91c38..0f87df7bcf3 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -381,15 +382,20 @@ public class CloudSolrStream extends TupleStream implements Expressible {
final Stream<SolrStream> streamOfSolrStream;
if (streamContext != null && streamContext.get("shards") != null) {
// stream of shard url with core
- streamOfSolrStream =
- getShards(this.zkHost, this.collection, this.streamContext, mParams).stream()
- .map(s -> new SolrStream(s, mParams));
+ final List<String> shards =
+ getShards(this.zkHost, this.collection, this.streamContext, mParams);
+ if (shards.isEmpty())
+ throw new IOException("No shards available from ZooKeeper: " + this.zkHost);
+ streamOfSolrStream = shards.stream().map(s -> new SolrStream(s, mParams));
} else {
// stream of replicas to reuse the same SolrHttpClient per baseUrl
// avoids re-parsing data we already have in the replicas
+ final List<Replica> replicas =
+ getReplicas(this.zkHost, this.collection, this.streamContext, mParams);
+ if (replicas.isEmpty())
+ throw new IOException("No replicas available from ZooKeeper: " + this.zkHost);
streamOfSolrStream =
- getReplicas(this.zkHost, this.collection, this.streamContext, mParams).stream()
- .map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
+ replicas.stream().map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
}
streamOfSolrStream.forEach(
diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java
new file mode 100644
index 00000000000..b7c8b8a836c
--- /dev/null
+++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Tests behaviors of CloudSolrStream when the cluster is behaving badly. */
+@SolrTestCaseJ4.SuppressSSL
+public class BadClusterTest extends SolrCloudTestCase {
+
+ private static final String collection = "streams";
+ private static final String id = "id";
+
+ private static final StreamFactory streamFactory =
+ new StreamFactory().withFunctionName("search", CloudSolrStream.class);
+
+ private static String zkHost;
+
+ @BeforeClass
+ public static void configureCluster() throws Exception {
+ configureCluster(1)
+ .addConfig(
+ "conf",
+ getFile("solrj")
+ .toPath()
+ .resolve("solr")
+ .resolve("configsets")
+ .resolve("streaming")
+ .resolve("conf"))
+ .configure();
+
+ CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collection, 1, 1);
+
+ zkHost = cluster.getZkServer().getZkAddress();
+ streamFactory.withCollectionZkHost(collection, zkHost);
+ }
+
+ // test order is important because the cluster progressively gets worse, but it is only created
+ // once in BeforeClass as in other tests
+ // ordering can not be strictly enforced with JUnit annotations because of parallel executions, so
+ // we have this aggregated test instead
+ @Test
+ public void testBadCluster() throws Exception {
+ testEmptyCollection();
+ testAllNodesDown();
+ testClusterShutdown();
+ }
+
+ private void testEmptyCollection() throws Exception {
+ CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+ assertEquals(0, getTuples(stream).size());
+ }
+
+ private void testAllNodesDown() throws Exception {
+
+ CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+ cluster.expireZkSession(cluster.getReplicaJetty(getReplicas().get(0)));
+
+ try {
+ getTuples(stream);
+ fail("Expected IOException");
+ } catch (IOException ioe) {
+ }
+ }
+
+ private void testClusterShutdown() throws Exception {
+
+ CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+ cluster.shutdown();
+
+ try {
+ getTuples(stream);
+ fail("Expected IOException: SolrException: TimeoutException");
+ } catch (IOException ioe) {
+ SolrException se = (SolrException) ioe.getCause();
+ TimeoutException te = (TimeoutException) se.getCause();
+ assertNotNull(te);
+ }
+ }
+
+ private StreamExpression buildSearchExpression() {
+ StreamExpression expression = new StreamExpression("search");
+ expression.addParameter(collection);
+ expression.addParameter(new StreamExpressionNamedParameter(CommonParams.Q, "*:*"));
+ expression.addParameter(new StreamExpressionNamedParameter(CommonParams.FL, id));
+ expression.addParameter(new StreamExpressionNamedParameter(CommonParams.SORT, id + " asc"));
+ return expression;
+ }
+
+ private List<Replica> getReplicas() throws IOException {
+ return TupleStream.getReplicas(zkHost, collection, null, new MultiMapSolrParams(Map.of()));
+ }
+
+ private List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+ tupleStream.open();
+ List<Tuple> tuples = new ArrayList<>();
+ for (; ; ) {
+ Tuple t = tupleStream.read();
+ if (t.EOF) {
+ break;
+ } else {
+ tuples.add(t);
+ }
+ }
+ tupleStream.close();
+ return tuples;
+ }
+}