You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/01/09 09:59:46 UTC
[solr] branch main updated: A testcase for Solr coordinator role (#1277)
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new c40ee9182b6 A testcase for Solr coordinator role (#1277)
c40ee9182b6 is described below
commit c40ee9182b6dba33b4eda7392fc0a616ccf0e50c
Author: Noble Paul <no...@users.noreply.github.com>
AuthorDate: Mon Jan 9 20:59:41 2023 +1100
A testcase for Solr coordinator role (#1277)
---
.../apache/solr/search/TestCoordinatorRole.java | 423 +++++++++++++++++++--
1 file changed, 388 insertions(+), 35 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
index 5e2dcfb70a8..237e87f51d0 100644
--- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
+++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
@@ -17,69 +17,422 @@
package org.apache.solr.search;
+import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
+import static org.apache.solr.common.params.CommonParams.TRUE;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.NodeRoles;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.servlet.CoordinatorHttpSolrCall;
-import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestCoordinatorRole extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- @BeforeClass
- public static void setupCluster() throws Exception {
- configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
- }
+ private static final long DEFAULT_TOLERANCE = 500;
public void testSimple() throws Exception {
- CloudSolrClient client = cluster.getSolrClient();
- String COLLECTION_NAME = "test_coll";
- String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
- CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
- .process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
- UpdateRequest ur = new UpdateRequest();
- for (int i = 0; i < 10; i++) {
- SolrInputDocument doc2 = new SolrInputDocument();
- doc2.addField("id", "" + i);
- ur.add(doc2);
+ MiniSolrCloudCluster cluster =
+ configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
+ try {
+ CloudSolrClient client = cluster.getSolrClient();
+ String COLLECTION_NAME = "test_coll";
+ String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
+ UpdateRequest ur = new UpdateRequest();
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField("id", "" + i);
+ ur.add(doc2);
+ }
+
+ ur.commit(client, COLLECTION_NAME);
+ QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
+ assertEquals(10, rsp.getResults().getNumFound());
+
+ System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
+ JettySolrRunner coordinatorJetty = null;
+ try {
+ coordinatorJetty = cluster.startJettySolrRunner();
+ } finally {
+ System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ }
+ QueryResponse rslt =
+ new QueryRequest(new SolrQuery("*:*"))
+ .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
+ .process(client, COLLECTION_NAME);
+
+ assertEquals(10, rslt.getResults().size());
+
+ DocCollection collection =
+ cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
+ assertNotNull(collection);
+
+ Set<String> expectedNodes = new HashSet<>();
+ expectedNodes.add(coordinatorJetty.getNodeName());
+ collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
+ assertTrue(expectedNodes.isEmpty());
+ } finally {
+ cluster.shutdown();
}
+ }
- ur.commit(client, COLLECTION_NAME);
- QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
- assertEquals(10, rsp.getResults().getNumFound());
+ private static void assertWithinTolerance(long expected, long actual) {
+ assertWithinTolerance(expected, actual, DEFAULT_TOLERANCE);
+ }
+ private static void assertWithinTolerance(long expected, long actual, long tolerance) {
+ assertTrue(
+ "expected=" + expected + ", actual=" + actual + ", tolerance=" + tolerance,
+ Math.abs(expected - actual) <= tolerance);
+ }
+
+ public void testNRTRestart() throws Exception {
+ // we restart jetty and expect to find on disk data - need a local fs directory
+ useFactory(null);
+ String COLL = "coordinator_test_coll";
+ MiniSolrCloudCluster cluster =
+ configureCluster(3)
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .addConfig("conf", configset("conf2"))
+ .configure();
System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
- JettySolrRunner coordinatorJetty = null;
+ JettySolrRunner qaJetty = cluster.startJettySolrRunner();
+ String qaJettyBase = qaJetty.getBaseUrl().toString();
+ System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ ExecutorService executor =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
try {
- coordinatorJetty = cluster.startJettySolrRunner();
+ CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLL, 1, 2);
+ DocCollection docColl =
+ cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
+ Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
+ assertNotNull(nrtReplica);
+ String nrtCore = nrtReplica.getCoreName();
+ Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
+ assertNotNull(pullReplica);
+ String pullCore = pullReplica.getCoreName();
+
+ SolrInputDocument sid = new SolrInputDocument();
+ sid.addField("id", "123");
+ sid.addField("desc_s", "A Document");
+ JettySolrRunner nrtJetty = null;
+ JettySolrRunner pullJetty = null;
+ for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+ String nodeName = j.getNodeName();
+ if (nodeName.equals(nrtReplica.getNodeName())) {
+ nrtJetty = j;
+ } else if (nodeName.equals(pullReplica.getNodeName())) {
+ pullJetty = j;
+ }
+ }
+ assertNotNull(nrtJetty);
+ assertNotNull(pullJetty);
+ try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
+ client.add(COLL, sid);
+ client.commit(COLL);
+ assertEquals(
+ nrtCore,
+ getHostCoreName(
+ COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
+ assertEquals(
+ pullCore,
+ getHostCoreName(
+ COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
+ // Now , kill NRT jetty
+ JettySolrRunner nrtJettyF = nrtJetty;
+ JettySolrRunner pullJettyF = pullJetty;
+ Random r = random();
+ final long establishBaselineMs = r.nextInt(1000);
+ final long nrtDowntimeMs = r.nextInt(10000);
+ // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
+ // indexing code,
+ // based on the fact that our indexing is based on a PULL-node client.
+ final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
+ Future<?> jettyManipulationFuture =
+ executor.submit(
+ () -> {
+ // we manipulate the jetty instances in a separate thread to more closely mimic
+ // the behavior we'd
+ // see irl.
+ try {
+ Thread.sleep(establishBaselineMs);
+ log.info("stopping NRT jetty ...");
+ nrtJettyF.stop();
+ log.info("NRT jetty stopped.");
+ Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
+ log.info("restarting NRT jetty ...");
+ nrtJettyF.start(true);
+ log.info("NRT jetty restarted.");
+ // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
+ // state
+ // used for query request routing has expired (60s). But here we force a return
+ // to NRT
+ // by stopping the PULL replica after a brief delay ...
+ Thread.sleep(pullServiceTimeMs);
+ log.info("stopping PULL jetty ...");
+ pullJettyF.stop();
+ log.info("PULL jetty stopped.");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ String hostCore;
+ long start = new Date().getTime();
+ long individualRequestStart = start;
+ int count = 0;
+ while (nrtCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> p.add("shards.preference", "replica.type:NRT")))) {
+ count++;
+ individualRequestStart = new Date().getTime();
+ }
+ long now = new Date().getTime();
+ log.info(
+ "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
+ count,
+ now - start,
+ establishBaselineMs,
+ now - individualRequestStart);
+ // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
+ // very fast,
+ // because our QA-based client already knows both replicas are active, the index is stable,
+ // so the moment
+ // the client finds NRT is down it should be able to failover immediately and transparently
+ // to PULL.
+ assertWithinTolerance(establishBaselineMs, now - start);
+ assertEquals(
+ "when we break out of the NRT query loop, should be b/c routed to PULL",
+ pullCore,
+ hostCore);
+ SolrInputDocument d = new SolrInputDocument();
+ d.addField("id", "345");
+ d.addField("desc_s", "Another Document");
+ // attempts to add another doc while NRT is down should fail, then eventually succeed when
+ // NRT comes back up
+ count = 0;
+ start = new Date().getTime();
+ individualRequestStart = start;
+ for (; ; ) {
+ try {
+ client.add(COLL, d);
+ client.commit(COLL);
+ break;
+ } catch (SolrException ex) {
+ // we expect these until nrtJetty is back up.
+ count++;
+ Thread.sleep(100);
+ }
+ individualRequestStart = new Date().getTime();
+ }
+ now = new Date().getTime();
+ log.info(
+ "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
+ now - individualRequestStart,
+ now - start,
+ nrtDowntimeMs,
+ count);
+ // NRT replica is back up, registered as available with Zk, and availability info has been
+ // pulled down by
+ // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
+ // completed. All of this
+ // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
+ // it causes failures
+ // regularly we should feel free to increase the tolerance; but it's meant to provide a
+ // stable baseline from
+ // which to detect regressions.
+ assertWithinTolerance(nrtDowntimeMs, now - start, 3000);
+ count = 0;
+ start = new Date().getTime();
+ individualRequestStart = start;
+ while (pullCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> {
+ p.set(CommonParams.Q, "id:345");
+ p.add("shards.preference", "replica.type:NRT");
+ }))) {
+ count++;
+ Thread.sleep(100);
+ individualRequestStart = new Date().getTime();
+ }
+ now = new Date().getTime();
+ log.info(
+ "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
+ count,
+ now - start,
+ pullServiceTimeMs,
+ now - individualRequestStart);
+ assertWithinTolerance(pullServiceTimeMs, now - start, 1000);
+ assertEquals(nrtCore, hostCore);
+ // allow any exceptions to propagate
+ jettyManipulationFuture.get();
+ if (true) return;
+
+ // next phase: just toggle a bunch
+ // TODO: could separate this out into a different test method, but this should suffice for
+ // now
+ pullJetty.start(true);
+ AtomicBoolean done = new AtomicBoolean();
+ long runMinutes = 1;
+ long finishTimeMs =
+ new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
+ JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
+ Random threadRandom = new Random(r.nextInt());
+ Future<Integer> f =
+ executor.submit(
+ () -> {
+ int iteration = 0;
+ while (new Date().getTime() < finishTimeMs && !done.get()) {
+ int idx = iteration++ % jettys.length;
+ JettySolrRunner toManipulate = jettys[idx];
+ try {
+ int serveTogetherTime = threadRandom.nextInt(7000);
+ int downTime = threadRandom.nextInt(7000);
+ log.info("serving together for {}ms", serveTogetherTime);
+ Thread.sleep(serveTogetherTime);
+ log.info("stopping {} ...", idx);
+ toManipulate.stop();
+ log.info("stopped {}.", idx);
+ Thread.sleep(downTime);
+ log.info("restarting {} ...", idx);
+ toManipulate.start(true);
+ log.info("restarted {}.", idx);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ done.set(true);
+ return iteration;
+ });
+ count = 0;
+ start = new Date().getTime();
+ try {
+ do {
+ pullCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> {
+ p.set(CommonParams.Q, "id:345");
+ p.add("shards.preference", "replica.type:NRT");
+ }));
+ count++;
+ Thread.sleep(100);
+ } while (!done.get());
+ } finally {
+ final String result;
+ if (done.getAndSet(true)) {
+ result = "Success";
+ } else {
+ // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
+ // block)
+ result = "Failure";
+ }
+ Integer toggleCount = f.get();
+ long secondsDuration =
+ TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
+ log.info(
+ "{}! {} seconds, {} toggles, {} requests served",
+ result,
+ secondsDuration,
+ toggleCount,
+ count);
+ }
+ }
} finally {
- System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ try {
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
+ } finally {
+ cluster.shutdown();
+ }
}
- QueryResponse rslt =
- new QueryRequest(new SolrQuery("*:*"))
- .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
- .process(client, COLLECTION_NAME);
-
- assertEquals(10, rslt.getResults().size());
+ }
- DocCollection collection =
- cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
- assertNotNull(collection);
+ @SuppressWarnings("rawtypes")
+ private String getHostCoreName(
+ String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
+ throws Exception {
- Set<String> expectedNodes = new HashSet<>();
- expectedNodes.add(coordinatorJetty.getNodeName());
- collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
- assertTrue(expectedNodes.isEmpty());
+ boolean found = false;
+ SolrQuery q = new SolrQuery("*:*");
+ q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
+ p.accept(q);
+ StringBuilder sb =
+ new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
+ q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
+ SolrDocumentList docs = null;
+ for (int i = 0; i < 100; i++) {
+ try {
+ SimpleOrderedMap rsp =
+ (SimpleOrderedMap)
+ Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
+ docs = (SolrDocumentList) rsp.get("response");
+ if (docs.size() > 0) {
+ found = true;
+ break;
+ }
+ } catch (SolrException ex) {
+ // we know we're doing tricky things that might cause transient errors
+ // TODO: all these query requests go to the QA node -- should QA propagate internal request
+ // errors
+ // to the external client (and the external client retry?) or should QA attempt to failover
+ // transparently
+ // in the event of an error?
+ if (i < 5) {
+ log.info("swallowing transient error", ex);
+ } else {
+ log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
+ fail("initial error time threshold exceeded");
+ }
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(found);
+ return (String) docs.get(0).getFieldValue("_core_");
}
}