You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/01/09 09:59:46 UTC

[solr] branch main updated: A testcase for Solr coordinator role (#1277)

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new c40ee9182b6 A testcase for Solr coordinator role (#1277)
c40ee9182b6 is described below

commit c40ee9182b6dba33b4eda7392fc0a616ccf0e50c
Author: Noble Paul <no...@users.noreply.github.com>
AuthorDate: Mon Jan 9 20:59:41 2023 +1100

    A testcase for Solr coordinator role (#1277)
---
 .../apache/solr/search/TestCoordinatorRole.java    | 423 +++++++++++++++++++--
 1 file changed, 388 insertions(+), 35 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
index 5e2dcfb70a8..237e87f51d0 100644
--- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
+++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
@@ -17,69 +17,422 @@
 
 package org.apache.solr.search;
 
+import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
+import static org.apache.solr.common.params.CommonParams.TRUE;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.NodeRoles;
 import org.apache.solr.embedded.JettySolrRunner;
 import org.apache.solr.servlet.CoordinatorHttpSolrCall;
-import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestCoordinatorRole extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
-  }
+  private static final long DEFAULT_TOLERANCE = 500;
 
   public void testSimple() throws Exception {
-    CloudSolrClient client = cluster.getSolrClient();
-    String COLLECTION_NAME = "test_coll";
-    String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
-    CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
-        .process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
-    UpdateRequest ur = new UpdateRequest();
-    for (int i = 0; i < 10; i++) {
-      SolrInputDocument doc2 = new SolrInputDocument();
-      doc2.addField("id", "" + i);
-      ur.add(doc2);
+    MiniSolrCloudCluster cluster =
+        configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
+    try {
+      CloudSolrClient client = cluster.getSolrClient();
+      String COLLECTION_NAME = "test_coll";
+      String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+      CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
+      UpdateRequest ur = new UpdateRequest();
+      for (int i = 0; i < 10; i++) {
+        SolrInputDocument doc2 = new SolrInputDocument();
+        doc2.addField("id", "" + i);
+        ur.add(doc2);
+      }
+
+      ur.commit(client, COLLECTION_NAME);
+      QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
+      assertEquals(10, rsp.getResults().getNumFound());
+
+      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
+      JettySolrRunner coordinatorJetty = null;
+      try {
+        coordinatorJetty = cluster.startJettySolrRunner();
+      } finally {
+        System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+      }
+      QueryResponse rslt =
+          new QueryRequest(new SolrQuery("*:*"))
+              .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
+              .process(client, COLLECTION_NAME);
+
+      assertEquals(10, rslt.getResults().size());
+
+      DocCollection collection =
+          cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
+      assertNotNull(collection);
+
+      Set<String> expectedNodes = new HashSet<>();
+      expectedNodes.add(coordinatorJetty.getNodeName());
+      collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
+      assertTrue(expectedNodes.isEmpty());
+    } finally {
+      cluster.shutdown();
     }
+  }
 
-    ur.commit(client, COLLECTION_NAME);
-    QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
-    assertEquals(10, rsp.getResults().getNumFound());
+  private static void assertWithinTolerance(long expected, long actual) {
+    assertWithinTolerance(expected, actual, DEFAULT_TOLERANCE);
+  }
 
+  private static void assertWithinTolerance(long expected, long actual, long tolerance) {
+    assertTrue(
+        "expected=" + expected + ", actual=" + actual + ", tolerance=" + tolerance,
+        Math.abs(expected - actual) <= tolerance);
+  }
+
+  public void testNRTRestart() throws Exception {
+    // we restart jetty and expect to find on disk data - need a local fs directory
+    useFactory(null);
+    String COLL = "coordinator_test_coll";
+    MiniSolrCloudCluster cluster =
+        configureCluster(3)
+            .withJettyConfig(jetty -> jetty.enableV2(true))
+            .addConfig("conf", configset("conf2"))
+            .configure();
     System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
-    JettySolrRunner coordinatorJetty = null;
+    JettySolrRunner qaJetty = cluster.startJettySolrRunner();
+    String qaJettyBase = qaJetty.getBaseUrl().toString();
+    System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+    ExecutorService executor =
+        ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
     try {
-      coordinatorJetty = cluster.startJettySolrRunner();
+      CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(COLL, 1, 2);
+      DocCollection docColl =
+          cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
+      Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
+      assertNotNull(nrtReplica);
+      String nrtCore = nrtReplica.getCoreName();
+      Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
+      assertNotNull(pullReplica);
+      String pullCore = pullReplica.getCoreName();
+
+      SolrInputDocument sid = new SolrInputDocument();
+      sid.addField("id", "123");
+      sid.addField("desc_s", "A Document");
+      JettySolrRunner nrtJetty = null;
+      JettySolrRunner pullJetty = null;
+      for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+        String nodeName = j.getNodeName();
+        if (nodeName.equals(nrtReplica.getNodeName())) {
+          nrtJetty = j;
+        } else if (nodeName.equals(pullReplica.getNodeName())) {
+          pullJetty = j;
+        }
+      }
+      assertNotNull(nrtJetty);
+      assertNotNull(pullJetty);
+      try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
+        client.add(COLL, sid);
+        client.commit(COLL);
+        assertEquals(
+            nrtCore,
+            getHostCoreName(
+                COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
+        assertEquals(
+            pullCore,
+            getHostCoreName(
+                COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
+        // Now , kill NRT jetty
+        JettySolrRunner nrtJettyF = nrtJetty;
+        JettySolrRunner pullJettyF = pullJetty;
+        Random r = random();
+        final long establishBaselineMs = r.nextInt(1000);
+        final long nrtDowntimeMs = r.nextInt(10000);
+        // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
+        // indexing code,
+        // based on the fact that our indexing is based on a PULL-node client.
+        final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
+        Future<?> jettyManipulationFuture =
+            executor.submit(
+                () -> {
+                  // we manipulate the jetty instances in a separate thread to more closely mimic
+                  // the behavior we'd
+                  // see irl.
+                  try {
+                    Thread.sleep(establishBaselineMs);
+                    log.info("stopping NRT jetty ...");
+                    nrtJettyF.stop();
+                    log.info("NRT jetty stopped.");
+                    Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
+                    log.info("restarting NRT jetty ...");
+                    nrtJettyF.start(true);
+                    log.info("NRT jetty restarted.");
+                    // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
+                    // state
+                    // used for query request routing has expired (60s). But here we force a return
+                    // to NRT
+                    // by stopping the PULL replica after a brief delay ...
+                    Thread.sleep(pullServiceTimeMs);
+                    log.info("stopping PULL jetty ...");
+                    pullJettyF.stop();
+                    log.info("PULL jetty stopped.");
+                  } catch (Exception e) {
+                    throw new RuntimeException(e);
+                  }
+                });
+        String hostCore;
+        long start = new Date().getTime();
+        long individualRequestStart = start;
+        int count = 0;
+        while (nrtCore.equals(
+            hostCore =
+                getHostCoreName(
+                    COLL,
+                    qaJettyBase,
+                    client,
+                    p -> p.add("shards.preference", "replica.type:NRT")))) {
+          count++;
+          individualRequestStart = new Date().getTime();
+        }
+        long now = new Date().getTime();
+        log.info(
+            "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
+            count,
+            now - start,
+            establishBaselineMs,
+            now - individualRequestStart);
+        // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
+        // very fast,
+        // because our QA-based client already knows both replicas are active, the index is stable,
+        // so the moment
+        // the client finds NRT is down it should be able to failover immediately and transparently
+        // to PULL.
+        assertWithinTolerance(establishBaselineMs, now - start);
+        assertEquals(
+            "when we break out of the NRT query loop, should be b/c routed to PULL",
+            pullCore,
+            hostCore);
+        SolrInputDocument d = new SolrInputDocument();
+        d.addField("id", "345");
+        d.addField("desc_s", "Another Document");
+        // attempts to add another doc while NRT is down should fail, then eventually succeed when
+        // NRT comes back up
+        count = 0;
+        start = new Date().getTime();
+        individualRequestStart = start;
+        for (; ; ) {
+          try {
+            client.add(COLL, d);
+            client.commit(COLL);
+            break;
+          } catch (SolrException ex) {
+            // we expect these until nrtJetty is back up.
+            count++;
+            Thread.sleep(100);
+          }
+          individualRequestStart = new Date().getTime();
+        }
+        now = new Date().getTime();
+        log.info(
+            "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
+            now - individualRequestStart,
+            now - start,
+            nrtDowntimeMs,
+            count);
+        // NRT replica is back up, registered as available with Zk, and availability info has been
+        // pulled down by
+        // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
+        // completed. All of this
+        // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
+        // it causes failures
+        // regularly we should feel free to increase the tolerance; but it's meant to provide a
+        // stable baseline from
+        // which to detect regressions.
+        assertWithinTolerance(nrtDowntimeMs, now - start, 3000);
+        count = 0;
+        start = new Date().getTime();
+        individualRequestStart = start;
+        while (pullCore.equals(
+            hostCore =
+                getHostCoreName(
+                    COLL,
+                    qaJettyBase,
+                    client,
+                    p -> {
+                      p.set(CommonParams.Q, "id:345");
+                      p.add("shards.preference", "replica.type:NRT");
+                    }))) {
+          count++;
+          Thread.sleep(100);
+          individualRequestStart = new Date().getTime();
+        }
+        now = new Date().getTime();
+        log.info(
+            "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
+            count,
+            now - start,
+            pullServiceTimeMs,
+            now - individualRequestStart);
+        assertWithinTolerance(pullServiceTimeMs, now - start, 1000);
+        assertEquals(nrtCore, hostCore);
+        // allow any exceptions to propagate
+        jettyManipulationFuture.get();
+        if (true) return;
+
+        // next phase: just toggle a bunch
+        // TODO: could separate this out into a different test method, but this should suffice for
+        // now
+        pullJetty.start(true);
+        AtomicBoolean done = new AtomicBoolean();
+        long runMinutes = 1;
+        long finishTimeMs =
+            new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
+        JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
+        Random threadRandom = new Random(r.nextInt());
+        Future<Integer> f =
+            executor.submit(
+                () -> {
+                  int iteration = 0;
+                  while (new Date().getTime() < finishTimeMs && !done.get()) {
+                    int idx = iteration++ % jettys.length;
+                    JettySolrRunner toManipulate = jettys[idx];
+                    try {
+                      int serveTogetherTime = threadRandom.nextInt(7000);
+                      int downTime = threadRandom.nextInt(7000);
+                      log.info("serving together for {}ms", serveTogetherTime);
+                      Thread.sleep(serveTogetherTime);
+                      log.info("stopping {} ...", idx);
+                      toManipulate.stop();
+                      log.info("stopped {}.", idx);
+                      Thread.sleep(downTime);
+                      log.info("restarting {} ...", idx);
+                      toManipulate.start(true);
+                      log.info("restarted {}.", idx);
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  done.set(true);
+                  return iteration;
+                });
+        count = 0;
+        start = new Date().getTime();
+        try {
+          do {
+            pullCore.equals(
+                hostCore =
+                    getHostCoreName(
+                        COLL,
+                        qaJettyBase,
+                        client,
+                        p -> {
+                          p.set(CommonParams.Q, "id:345");
+                          p.add("shards.preference", "replica.type:NRT");
+                        }));
+            count++;
+            Thread.sleep(100);
+          } while (!done.get());
+        } finally {
+          final String result;
+          if (done.getAndSet(true)) {
+            result = "Success";
+          } else {
+            // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
+            // block)
+            result = "Failure";
+          }
+          Integer toggleCount = f.get();
+          long secondsDuration =
+              TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
+          log.info(
+              "{}! {} seconds, {} toggles, {} requests served",
+              result,
+              secondsDuration,
+              toggleCount,
+              count);
+        }
+      }
     } finally {
-      System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+      try {
+        ExecutorUtil.shutdownAndAwaitTermination(executor);
+      } finally {
+        cluster.shutdown();
+      }
     }
-    QueryResponse rslt =
-        new QueryRequest(new SolrQuery("*:*"))
-            .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
-            .process(client, COLLECTION_NAME);
-
-    assertEquals(10, rslt.getResults().size());
+  }
 
-    DocCollection collection =
-        cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
-    assertNotNull(collection);
+  @SuppressWarnings("rawtypes")
+  private String getHostCoreName(
+      String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
+      throws Exception {
 
-    Set<String> expectedNodes = new HashSet<>();
-    expectedNodes.add(coordinatorJetty.getNodeName());
-    collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
-    assertTrue(expectedNodes.isEmpty());
+    boolean found = false;
+    SolrQuery q = new SolrQuery("*:*");
+    q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
+    p.accept(q);
+    StringBuilder sb =
+        new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
+    q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
+    SolrDocumentList docs = null;
+    for (int i = 0; i < 100; i++) {
+      try {
+        SimpleOrderedMap rsp =
+            (SimpleOrderedMap)
+                Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
+        docs = (SolrDocumentList) rsp.get("response");
+        if (docs.size() > 0) {
+          found = true;
+          break;
+        }
+      } catch (SolrException ex) {
+        // we know we're doing tricky things that might cause transient errors
+        // TODO: all these query requests go to the QA node -- should QA propagate internal request
+        // errors
+        //  to the external client (and the external client retry?) or should QA attempt to failover
+        // transparently
+        //  in the event of an error?
+        if (i < 5) {
+          log.info("swallowing transient error", ex);
+        } else {
+          log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
+          fail("initial error time threshold exceeded");
+        }
+      }
+      Thread.sleep(100);
+    }
+    assertTrue(found);
+    return (String) docs.get(0).getFieldValue("_core_");
   }
 }