You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ep...@apache.org on 2024/02/23 18:52:17 UTC

(solr) branch branch_9x updated: SOLR-16138: Throw when streaming and all cores are down (#784)

This is an automated email from the ASF dual-hosted git repository.

epugh pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 8aaa68fb4d4 SOLR-16138: Throw when streaming and all cores are down (#784)
8aaa68fb4d4 is described below

commit 8aaa68fb4d4ffc46611c489faf044cdd1931cf30
Author: Antoine Bursaux <bu...@gmail.com>
AuthorDate: Fri Feb 23 19:50:56 2024 +0100

    SOLR-16138: Throw when streaming and all cores are down (#784)
    
    Co-authored-by: Eric Pugh <ep...@opensourceconnections.com>
---
 solr/CHANGES.txt                                   |   2 +
 .../client/solrj/io/stream/CloudSolrStream.java    |  16 ++-
 .../client/solrj/io/stream/BadClusterTest.java     | 141 +++++++++++++++++++++
 3 files changed, 154 insertions(+), 5 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3cea0081583..5792622c3d1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -31,6 +31,8 @@ Improvements
 
 * SOLR-17058: Add 'distrib.statsCache' parameter to disable distributed stats requests at query time. (Wei Wang, Mikhail Khludnev)
 
+* SOLR-16138: Throw a exception when issuing a streaming expression and all cores are down instead of returning 0 documents. (Antoine Bursaux via Eric Pugh)
+
 Optimizations
 ---------------------
 * SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)
diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index f75dec91c38..0f87df7bcf3 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -381,15 +382,20 @@ public class CloudSolrStream extends TupleStream implements Expressible {
       final Stream<SolrStream> streamOfSolrStream;
       if (streamContext != null && streamContext.get("shards") != null) {
         // stream of shard url with core
-        streamOfSolrStream =
-            getShards(this.zkHost, this.collection, this.streamContext, mParams).stream()
-                .map(s -> new SolrStream(s, mParams));
+        final List<String> shards =
+            getShards(this.zkHost, this.collection, this.streamContext, mParams);
+        if (shards.isEmpty())
+          throw new IOException("No shards available from ZooKeeper: " + this.zkHost);
+        streamOfSolrStream = shards.stream().map(s -> new SolrStream(s, mParams));
       } else {
         // stream of replicas to reuse the same SolrHttpClient per baseUrl
         // avoids re-parsing data we already have in the replicas
+        final List<Replica> replicas =
+            getReplicas(this.zkHost, this.collection, this.streamContext, mParams);
+        if (replicas.isEmpty())
+          throw new IOException("No replicas available from ZooKeeper: " + this.zkHost);
         streamOfSolrStream =
-            getReplicas(this.zkHost, this.collection, this.streamContext, mParams).stream()
-                .map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
+            replicas.stream().map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
       }
 
       streamOfSolrStream.forEach(
diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java
new file mode 100644
index 00000000000..b7c8b8a836c
--- /dev/null
+++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/BadClusterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Tests behaviors of CloudSolrStream when the cluster is behaving badly. */
+@SolrTestCaseJ4.SuppressSSL
+public class BadClusterTest extends SolrCloudTestCase {
+
+  private static final String collection = "streams";
+  private static final String id = "id";
+
+  private static final StreamFactory streamFactory =
+      new StreamFactory().withFunctionName("search", CloudSolrStream.class);
+
+  private static String zkHost;
+
+  @BeforeClass
+  public static void configureCluster() throws Exception {
+    configureCluster(1)
+        .addConfig(
+            "conf",
+            getFile("solrj")
+                .toPath()
+                .resolve("solr")
+                .resolve("configsets")
+                .resolve("streaming")
+                .resolve("conf"))
+        .configure();
+
+    CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
+        .process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(collection, 1, 1);
+
+    zkHost = cluster.getZkServer().getZkAddress();
+    streamFactory.withCollectionZkHost(collection, zkHost);
+  }
+
+  // test order is important because the cluster progressively gets worse, but it is only created
+  // once in BeforeClass as in other tests
+  // ordering can not be strictly enforced with JUnit annotations because of parallel executions, so
+  // we have this aggregated test instead
+  @Test
+  public void testBadCluster() throws Exception {
+    testEmptyCollection();
+    testAllNodesDown();
+    testClusterShutdown();
+  }
+
+  private void testEmptyCollection() throws Exception {
+    CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+    assertEquals(0, getTuples(stream).size());
+  }
+
+  private void testAllNodesDown() throws Exception {
+
+    CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+    cluster.expireZkSession(cluster.getReplicaJetty(getReplicas().get(0)));
+
+    try {
+      getTuples(stream);
+      fail("Expected IOException");
+    } catch (IOException ioe) {
+    }
+  }
+
+  private void testClusterShutdown() throws Exception {
+
+    CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
+    cluster.shutdown();
+
+    try {
+      getTuples(stream);
+      fail("Expected IOException: SolrException: TimeoutException");
+    } catch (IOException ioe) {
+      SolrException se = (SolrException) ioe.getCause();
+      TimeoutException te = (TimeoutException) se.getCause();
+      assertNotNull(te);
+    }
+  }
+
+  private StreamExpression buildSearchExpression() {
+    StreamExpression expression = new StreamExpression("search");
+    expression.addParameter(collection);
+    expression.addParameter(new StreamExpressionNamedParameter(CommonParams.Q, "*:*"));
+    expression.addParameter(new StreamExpressionNamedParameter(CommonParams.FL, id));
+    expression.addParameter(new StreamExpressionNamedParameter(CommonParams.SORT, id + " asc"));
+    return expression;
+  }
+
+  private List<Replica> getReplicas() throws IOException {
+    return TupleStream.getReplicas(zkHost, collection, null, new MultiMapSolrParams(Map.of()));
+  }
+
+  private List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
+    tupleStream.open();
+    List<Tuple> tuples = new ArrayList<>();
+    for (; ; ) {
+      Tuple t = tupleStream.read();
+      if (t.EOF) {
+        break;
+      } else {
+        tuples.add(t);
+      }
+    }
+    tupleStream.close();
+    return tuples;
+  }
+}