You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/01/12 07:54:33 UTC
[solr] branch branch_9x updated: test case added for coordinator role
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new ec9b152c31f test case added for coordinator role
ec9b152c31f is described below
commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
Author: Noble Paul <no...@gmail.com>
AuthorDate: Thu Jan 12 18:54:15 2023 +1100
test case added for coordinator role
---
.../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++--
1 file changed, 375 insertions(+), 37 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
index 5e2dcfb70a8..6c4e845cf5a 100644
--- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
+++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
@@ -17,69 +17,407 @@
package org.apache.solr.search;
+import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
+import static org.apache.solr.common.params.CommonParams.TRUE;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.NodeRoles;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.servlet.CoordinatorHttpSolrCall;
-import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestCoordinatorRole extends SolrCloudTestCase {
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
- }
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void testSimple() throws Exception {
- CloudSolrClient client = cluster.getSolrClient();
- String COLLECTION_NAME = "test_coll";
- String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
- CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
- .process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
- UpdateRequest ur = new UpdateRequest();
- for (int i = 0; i < 10; i++) {
- SolrInputDocument doc2 = new SolrInputDocument();
- doc2.addField("id", "" + i);
- ur.add(doc2);
- }
+ MiniSolrCloudCluster cluster =
+ configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
+ try {
+ CloudSolrClient client = cluster.getSolrClient();
+ String COLLECTION_NAME = "test_coll";
+ String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
+ UpdateRequest ur = new UpdateRequest();
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField("id", "" + i);
+ ur.add(doc2);
+ }
- ur.commit(client, COLLECTION_NAME);
- QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
- assertEquals(10, rsp.getResults().getNumFound());
+ ur.commit(client, COLLECTION_NAME);
+ QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
+ assertEquals(10, rsp.getResults().getNumFound());
+ System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
+ JettySolrRunner coordinatorJetty = null;
+ try {
+ coordinatorJetty = cluster.startJettySolrRunner();
+ } finally {
+ System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ }
+ QueryResponse rslt =
+ new QueryRequest(new SolrQuery("*:*"))
+ .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
+ .process(client, COLLECTION_NAME);
+
+ assertEquals(10, rslt.getResults().size());
+
+ DocCollection collection =
+ cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
+ assertNotNull(collection);
+
+ Set<String> expectedNodes = new HashSet<>();
+ expectedNodes.add(coordinatorJetty.getNodeName());
+ collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
+ assertTrue(expectedNodes.isEmpty());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ public void testNRTRestart() throws Exception {
+ // we restart jetty and expect to find on disk data - need a local fs directory
+ useFactory(null);
+ String COLL = "coordinator_test_coll";
+ MiniSolrCloudCluster cluster =
+ configureCluster(3)
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .addConfig("conf", configset("conf2"))
+ .configure();
System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
- JettySolrRunner coordinatorJetty = null;
+ JettySolrRunner qaJetty = cluster.startJettySolrRunner();
+ String qaJettyBase = qaJetty.getBaseUrl().toString();
+ System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ ExecutorService executor =
+ ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
try {
- coordinatorJetty = cluster.startJettySolrRunner();
+ CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLL, 1, 2);
+ DocCollection docColl =
+ cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
+ Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
+ assertNotNull(nrtReplica);
+ String nrtCore = nrtReplica.getCoreName();
+ Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
+ assertNotNull(pullReplica);
+ String pullCore = pullReplica.getCoreName();
+
+ SolrInputDocument sid = new SolrInputDocument();
+ sid.addField("id", "123");
+ sid.addField("desc_s", "A Document");
+ JettySolrRunner nrtJetty = null;
+ JettySolrRunner pullJetty = null;
+ for (JettySolrRunner j : cluster.getJettySolrRunners()) {
+ String nodeName = j.getNodeName();
+ if (nodeName.equals(nrtReplica.getNodeName())) {
+ nrtJetty = j;
+ } else if (nodeName.equals(pullReplica.getNodeName())) {
+ pullJetty = j;
+ }
+ }
+ assertNotNull(nrtJetty);
+ assertNotNull(pullJetty);
+ try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
+ client.add(COLL, sid);
+ client.commit(COLL);
+ assertEquals(
+ nrtCore,
+ getHostCoreName(
+ COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
+ assertEquals(
+ pullCore,
+ getHostCoreName(
+ COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
+ // Now , kill NRT jetty
+ JettySolrRunner nrtJettyF = nrtJetty;
+ JettySolrRunner pullJettyF = pullJetty;
+ Random r = random();
+ final long establishBaselineMs = r.nextInt(1000);
+ final long nrtDowntimeMs = r.nextInt(10000);
+ // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
+ // indexing code,
+ // based on the fact that our indexing is based on a PULL-node client.
+ final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
+ Future<?> jettyManipulationFuture =
+ executor.submit(
+ () -> {
+ // we manipulate the jetty instances in a separate thread to more closely mimic
+ // the behavior we'd
+ // see irl.
+ try {
+ Thread.sleep(establishBaselineMs);
+ log.info("stopping NRT jetty ...");
+ nrtJettyF.stop();
+ log.info("NRT jetty stopped.");
+ Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
+ log.info("restarting NRT jetty ...");
+ nrtJettyF.start(true);
+ log.info("NRT jetty restarted.");
+ // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
+ // state
+ // used for query request routing has expired (60s). But here we force a return
+ // to NRT
+ // by stopping the PULL replica after a brief delay ...
+ Thread.sleep(pullServiceTimeMs);
+ log.info("stopping PULL jetty ...");
+ pullJettyF.stop();
+ log.info("PULL jetty stopped.");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ String hostCore;
+ long start = new Date().getTime();
+ long individualRequestStart = start;
+ int count = 0;
+ while (nrtCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> p.add("shards.preference", "replica.type:NRT")))) {
+ count++;
+ individualRequestStart = new Date().getTime();
+ }
+ long now = new Date().getTime();
+ log.info(
+ "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
+ count,
+ now - start,
+ establishBaselineMs,
+ now - individualRequestStart);
+ // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
+ // very fast,
+ // because our QA-based client already knows both replicas are active, the index is stable,
+ // so the moment
+ // the client finds NRT is down it should be able to failover immediately and transparently
+ // to PULL.
+ assertEquals(
+ "when we break out of the NRT query loop, should be b/c routed to PULL",
+ pullCore,
+ hostCore);
+ SolrInputDocument d = new SolrInputDocument();
+ d.addField("id", "345");
+ d.addField("desc_s", "Another Document");
+ // attempts to add another doc while NRT is down should fail, then eventually succeed when
+ // NRT comes back up
+ count = 0;
+ start = new Date().getTime();
+ individualRequestStart = start;
+ for (; ; ) {
+ try {
+ client.add(COLL, d);
+ client.commit(COLL);
+ break;
+ } catch (SolrException ex) {
+ // we expect these until nrtJetty is back up.
+ count++;
+ Thread.sleep(100);
+ }
+ individualRequestStart = new Date().getTime();
+ }
+ now = new Date().getTime();
+ log.info(
+ "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
+ now - individualRequestStart,
+ now - start,
+ nrtDowntimeMs,
+ count);
+ // NRT replica is back up, registered as available with Zk, and availability info has been
+ // pulled down by
+ // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
+ // completed. All of this
+ // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
+ // it causes failures
+ // regularly we should feel free to increase the tolerance; but it's meant to provide a
+ // stable baseline from
+ // which to detect regressions.
+ count = 0;
+ start = new Date().getTime();
+ individualRequestStart = start;
+ while (pullCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> {
+ p.set(CommonParams.Q, "id:345");
+ p.add("shards.preference", "replica.type:NRT");
+ }))) {
+ count++;
+ Thread.sleep(100);
+ individualRequestStart = new Date().getTime();
+ }
+ now = new Date().getTime();
+ log.info(
+ "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
+ count,
+ now - start,
+ pullServiceTimeMs,
+ now - individualRequestStart);
+ assertEquals(nrtCore, hostCore);
+ // allow any exceptions to propagate
+ jettyManipulationFuture.get();
+ if (true) return;
+
+ // next phase: just toggle a bunch
+ // TODO: could separate this out into a different test method, but this should suffice for
+ // now
+ pullJetty.start(true);
+ AtomicBoolean done = new AtomicBoolean();
+ long runMinutes = 1;
+ long finishTimeMs =
+ new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
+ JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
+ Random threadRandom = new Random(r.nextInt());
+ Future<Integer> f =
+ executor.submit(
+ () -> {
+ int iteration = 0;
+ while (new Date().getTime() < finishTimeMs && !done.get()) {
+ int idx = iteration++ % jettys.length;
+ JettySolrRunner toManipulate = jettys[idx];
+ try {
+ int serveTogetherTime = threadRandom.nextInt(7000);
+ int downTime = threadRandom.nextInt(7000);
+ log.info("serving together for {}ms", serveTogetherTime);
+ Thread.sleep(serveTogetherTime);
+ log.info("stopping {} ...", idx);
+ toManipulate.stop();
+ log.info("stopped {}.", idx);
+ Thread.sleep(downTime);
+ log.info("restarting {} ...", idx);
+ toManipulate.start(true);
+ log.info("restarted {}.", idx);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ done.set(true);
+ return iteration;
+ });
+ count = 0;
+ start = new Date().getTime();
+ try {
+ do {
+ pullCore.equals(
+ hostCore =
+ getHostCoreName(
+ COLL,
+ qaJettyBase,
+ client,
+ p -> {
+ p.set(CommonParams.Q, "id:345");
+ p.add("shards.preference", "replica.type:NRT");
+ }));
+ count++;
+ Thread.sleep(100);
+ } while (!done.get());
+ } finally {
+ final String result;
+ if (done.getAndSet(true)) {
+ result = "Success";
+ } else {
+ // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
+ // block)
+ result = "Failure";
+ }
+ Integer toggleCount = f.get();
+ long secondsDuration =
+ TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
+ log.info(
+ "{}! {} seconds, {} toggles, {} requests served",
+ result,
+ secondsDuration,
+ toggleCount,
+ count);
+ }
+ }
} finally {
- System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ try {
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
+ } finally {
+ cluster.shutdown();
+ }
}
- QueryResponse rslt =
- new QueryRequest(new SolrQuery("*:*"))
- .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
- .process(client, COLLECTION_NAME);
-
- assertEquals(10, rslt.getResults().size());
+ }
- DocCollection collection =
- cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
- assertNotNull(collection);
+ @SuppressWarnings("rawtypes")
+ private String getHostCoreName(
+ String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
+ throws Exception {
- Set<String> expectedNodes = new HashSet<>();
- expectedNodes.add(coordinatorJetty.getNodeName());
- collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
- assertTrue(expectedNodes.isEmpty());
+ boolean found = false;
+ SolrQuery q = new SolrQuery("*:*");
+ q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
+ p.accept(q);
+ StringBuilder sb =
+ new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
+ q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
+ SolrDocumentList docs = null;
+ for (int i = 0; i < 100; i++) {
+ try {
+ SimpleOrderedMap rsp =
+ (SimpleOrderedMap)
+ Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
+ docs = (SolrDocumentList) rsp.get("response");
+ if (docs.size() > 0) {
+ found = true;
+ break;
+ }
+ } catch (SolrException ex) {
+ // we know we're doing tricky things that might cause transient errors
+ // TODO: all these query requests go to the QA node -- should QA propagate internal request
+ // errors
+ // to the external client (and the external client retry?) or should QA attempt to failover
+ // transparently
+ // in the event of an error?
+ if (i < 5) {
+ log.info("swallowing transient error", ex);
+ } else {
+ log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
+ fail("initial error time threshold exceeded");
+ }
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(found);
+ return (String) docs.get(0).getFieldValue("_core_");
}
}
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Ishan Chattopadhyaya <ic...@gmail.com>.
Sure, I'll take a look.
On Sun, 22 Jan, 2023, 2:21 am Chris Hostetter, <ho...@fucit.org>
wrote:
> :
> : Done, fixed. Thanks!
>
> If you understand what this test is doing, can you please also look into
> the sporadic failures on main ?
>
>
>
> :
> : On Sat, Jan 21, 2023 at 12:21 PM Ishan Chattopadhyaya
> : <ic...@gmail.com> wrote:
> : >
> : > I'll take a look, Hoss.
> : >
> : > On Sat, 21 Jan, 2023, 2:37 am Chris Hostetter, <
> hossman_lucene@fucit.org> wrote:
> : >>
> : >>
> : >> Created jia & AwaitsFix'ed the test ...
> : >>
> : >> https://issues.apache.org/jira/browse/SOLR-16630
> : >>
> : >>
> : >>
> : >> : Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
> : >> : From: Chris Hostetter <ho...@fucit.org>
> : >> : To: dev@solr.apache.org
> : >> : Cc: "commits@solr.apache.org" <co...@solr.apache.org>
> : >> : Subject: Re: [solr] branch branch_9x updated: test case added for
> coordinator
> : >> : role
> : >> :
> : >> :
> : >> : Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on
> 9x a
> : >> : ridiculous number of times since you added it a week ago.
> : >> :
> : >> : IIUC this test has *NEVER* passed on a jenkins 9x build (only on
> the main
> : >> : builds)
> : >> :
> : >> : -Hoss
> : >> :
> : >> :
> : >> : : Date: Thu, 12 Jan 2023 07:54:33 +0000
> : >> : : From: noble@apache.org
> : >> : : Reply-To: dev@solr.apache.org
> : >> : : To: "commits@solr.apache.org" <co...@solr.apache.org>
> : >> : : Subject: [solr] branch branch_9x updated: test case added for
> coordinator role
> : >> : :
> : >> : : This is an automated email from the ASF dual-hosted git
> repository.
> : >> : :
> : >> : : noble pushed a commit to branch branch_9x
> : >> : : in repository https://gitbox.apache.org/repos/asf/solr.git
> : >> : :
> : >> : :
> : >> : : The following commit(s) were added to refs/heads/branch_9x by
> this push:
> : >> : : new ec9b152c31f test case added for coordinator role
> : >> : : ec9b152c31f is described below
> : >> : :
> : >> : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
> : >> : : Author: Noble Paul <no...@gmail.com>
> : >> : : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
> : >> : :
> : >> : : test case added for coordinator role
> : >> : : ---
> : >> : : .../apache/solr/search/TestCoordinatorRole.java | 412
> +++++++++++++++++++--
> : >> : : 1 file changed, 375 insertions(+), 37 deletions(-)
> : >> : :
> : >> : : diff --git
> a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : >> : : index 5e2dcfb70a8..6c4e845cf5a 100644
> : >> : : ---
> a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : >> : : +++
> b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : >> : : @@ -17,69 +17,407 @@
> : >> : :
> : >> : : package org.apache.solr.search;
> : >> : :
> : >> : : +import static
> org.apache.solr.common.params.CommonParams.OMIT_HEADER;
> : >> : : +import static org.apache.solr.common.params.CommonParams.TRUE;
> : >> : : +
> : >> : : +import java.lang.invoke.MethodHandles;
> : >> : : +import java.util.Date;
> : >> : : +import java.util.EnumSet;
> : >> : : import java.util.HashSet;
> : >> : : import java.util.List;
> : >> : : +import java.util.Random;
> : >> : : import java.util.Set;
> : >> : : +import java.util.concurrent.ExecutorService;
> : >> : : +import java.util.concurrent.Future;
> : >> : : +import java.util.concurrent.TimeUnit;
> : >> : : +import java.util.concurrent.atomic.AtomicBoolean;
> : >> : : +import java.util.function.Consumer;
> : >> : : import org.apache.solr.client.solrj.SolrQuery;
> : >> : : import org.apache.solr.client.solrj.impl.CloudSolrClient;
> : >> : : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
> : >> : : import
> org.apache.solr.client.solrj.request.CollectionAdminRequest;
> : >> : : import org.apache.solr.client.solrj.request.QueryRequest;
> : >> : : import org.apache.solr.client.solrj.request.UpdateRequest;
> : >> : : import org.apache.solr.client.solrj.response.QueryResponse;
> : >> : : +import org.apache.solr.cloud.MiniSolrCloudCluster;
> : >> : : import org.apache.solr.cloud.SolrCloudTestCase;
> : >> : : +import org.apache.solr.common.SolrDocumentList;
> : >> : : +import org.apache.solr.common.SolrException;
> : >> : : import org.apache.solr.common.SolrInputDocument;
> : >> : : import org.apache.solr.common.cloud.DocCollection;
> : >> : : +import org.apache.solr.common.cloud.Replica;
> : >> : : +import org.apache.solr.common.params.CommonParams;
> : >> : : +import org.apache.solr.common.util.ExecutorUtil;
> : >> : : +import org.apache.solr.common.util.SimpleOrderedMap;
> : >> : : +import org.apache.solr.common.util.SolrNamedThreadFactory;
> : >> : : +import org.apache.solr.common.util.Utils;
> : >> : : import org.apache.solr.core.NodeRoles;
> : >> : : import org.apache.solr.embedded.JettySolrRunner;
> : >> : : import org.apache.solr.servlet.CoordinatorHttpSolrCall;
> : >> : : -import org.junit.BeforeClass;
> : >> : : +import org.slf4j.Logger;
> : >> : : +import org.slf4j.LoggerFactory;
> : >> : :
> : >> : : public class TestCoordinatorRole extends SolrCloudTestCase {
> : >> : : -
> : >> : : - @BeforeClass
> : >> : : - public static void setupCluster() throws Exception {
> : >> : : - configureCluster(4).addConfig("conf",
> configset("cloud-minimal")).configure();
> : >> : : - }
> : >> : : + private static final Logger log =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> : >> : :
> : >> : : public void testSimple() throws Exception {
> : >> : : - CloudSolrClient client = cluster.getSolrClient();
> : >> : : - String COLLECTION_NAME = "test_coll";
> : >> : : - String SYNTHETIC_COLLECTION =
> CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
> : >> : : - CollectionAdminRequest.createCollection(COLLECTION_NAME,
> "conf", 2, 2)
> : >> : : - .process(cluster.getSolrClient());
> : >> : : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
> : >> : : - UpdateRequest ur = new UpdateRequest();
> : >> : : - for (int i = 0; i < 10; i++) {
> : >> : : - SolrInputDocument doc2 = new SolrInputDocument();
> : >> : : - doc2.addField("id", "" + i);
> : >> : : - ur.add(doc2);
> : >> : : - }
> : >> : : + MiniSolrCloudCluster cluster =
> : >> : : + configureCluster(4).addConfig("conf",
> configset("cloud-minimal")).configure();
> : >> : : + try {
> : >> : : + CloudSolrClient client = cluster.getSolrClient();
> : >> : : + String COLLECTION_NAME = "test_coll";
> : >> : : + String SYNTHETIC_COLLECTION =
> CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
> : >> : : + CollectionAdminRequest.createCollection(COLLECTION_NAME,
> "conf", 2, 2)
> : >> : : + .process(cluster.getSolrClient());
> : >> : : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
> : >> : : + UpdateRequest ur = new UpdateRequest();
> : >> : : + for (int i = 0; i < 10; i++) {
> : >> : : + SolrInputDocument doc2 = new SolrInputDocument();
> : >> : : + doc2.addField("id", "" + i);
> : >> : : + ur.add(doc2);
> : >> : : + }
> : >> : :
> : >> : : - ur.commit(client, COLLECTION_NAME);
> : >> : : - QueryResponse rsp = client.query(COLLECTION_NAME, new
> SolrQuery("*:*"));
> : >> : : - assertEquals(10, rsp.getResults().getNumFound());
> : >> : : + ur.commit(client, COLLECTION_NAME);
> : >> : : + QueryResponse rsp = client.query(COLLECTION_NAME, new
> SolrQuery("*:*"));
> : >> : : + assertEquals(10, rsp.getResults().getNumFound());
> : >> : :
> : >> : : + System.setProperty(NodeRoles.NODE_ROLES_PROP,
> "coordinator:on");
> : >> : : + JettySolrRunner coordinatorJetty = null;
> : >> : : + try {
> : >> : : + coordinatorJetty = cluster.startJettySolrRunner();
> : >> : : + } finally {
> : >> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : >> : : + }
> : >> : : + QueryResponse rslt =
> : >> : : + new QueryRequest(new SolrQuery("*:*"))
> : >> : : +
> .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
> : >> : : + .process(client, COLLECTION_NAME);
> : >> : : +
> : >> : : + assertEquals(10, rslt.getResults().size());
> : >> : : +
> : >> : : + DocCollection collection =
> : >> : : +
> cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
> : >> : : + assertNotNull(collection);
> : >> : : +
> : >> : : + Set<String> expectedNodes = new HashSet<>();
> : >> : : + expectedNodes.add(coordinatorJetty.getNodeName());
> : >> : : + collection.forEachReplica((s, replica) ->
> expectedNodes.remove(replica.getNodeName()));
> : >> : : + assertTrue(expectedNodes.isEmpty());
> : >> : : + } finally {
> : >> : : + cluster.shutdown();
> : >> : : + }
> : >> : : + }
> : >> : : +
> : >> : : + public void testNRTRestart() throws Exception {
> : >> : : + // we restart jetty and expect to find on disk data - need a
> local fs directory
> : >> : : + useFactory(null);
> : >> : : + String COLL = "coordinator_test_coll";
> : >> : : + MiniSolrCloudCluster cluster =
> : >> : : + configureCluster(3)
> : >> : : + .withJettyConfig(jetty -> jetty.enableV2(true))
> : >> : : + .addConfig("conf", configset("conf2"))
> : >> : : + .configure();
> : >> : : System.setProperty(NodeRoles.NODE_ROLES_PROP,
> "coordinator:on");
> : >> : : - JettySolrRunner coordinatorJetty = null;
> : >> : : + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
> : >> : : + String qaJettyBase = qaJetty.getBaseUrl().toString();
> : >> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : >> : : + ExecutorService executor =
> : >> : : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new
> SolrNamedThreadFactory("manipulateJetty"));
> : >> : : try {
> : >> : : - coordinatorJetty = cluster.startJettySolrRunner();
> : >> : : + CollectionAdminRequest.createCollection(COLL, "conf", 1,
> 1, 0, 1)
> : >> : : + .process(cluster.getSolrClient());
> : >> : : + cluster.waitForActiveCollection(COLL, 1, 2);
> : >> : : + DocCollection docColl =
> : >> : : +
> cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
> : >> : : + Replica nrtReplica =
> docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
> : >> : : + assertNotNull(nrtReplica);
> : >> : : + String nrtCore = nrtReplica.getCoreName();
> : >> : : + Replica pullReplica =
> docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
> : >> : : + assertNotNull(pullReplica);
> : >> : : + String pullCore = pullReplica.getCoreName();
> : >> : : +
> : >> : : + SolrInputDocument sid = new SolrInputDocument();
> : >> : : + sid.addField("id", "123");
> : >> : : + sid.addField("desc_s", "A Document");
> : >> : : + JettySolrRunner nrtJetty = null;
> : >> : : + JettySolrRunner pullJetty = null;
> : >> : : + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
> : >> : : + String nodeName = j.getNodeName();
> : >> : : + if (nodeName.equals(nrtReplica.getNodeName())) {
> : >> : : + nrtJetty = j;
> : >> : : + } else if (nodeName.equals(pullReplica.getNodeName())) {
> : >> : : + pullJetty = j;
> : >> : : + }
> : >> : : + }
> : >> : : + assertNotNull(nrtJetty);
> : >> : : + assertNotNull(pullJetty);
> : >> : : + try (HttpSolrClient client = (HttpSolrClient)
> pullJetty.newClient()) {
> : >> : : + client.add(COLL, sid);
> : >> : : + client.commit(COLL);
> : >> : : + assertEquals(
> : >> : : + nrtCore,
> : >> : : + getHostCoreName(
> : >> : : + COLL, qaJettyBase, client, p ->
> p.add("shards.preference", "replica.type:NRT")));
> : >> : : + assertEquals(
> : >> : : + pullCore,
> : >> : : + getHostCoreName(
> : >> : : + COLL, qaJettyBase, client, p ->
> p.add("shards.preference", "replica.type:PULL")));
> : >> : : + // Now , kill NRT jetty
> : >> : : + JettySolrRunner nrtJettyF = nrtJetty;
> : >> : : + JettySolrRunner pullJettyF = pullJetty;
> : >> : : + Random r = random();
> : >> : : + final long establishBaselineMs = r.nextInt(1000);
> : >> : : + final long nrtDowntimeMs = r.nextInt(10000);
> : >> : : + // NOTE: for `pullServiceTimeMs`, it can't be
> super-short. This is just to simplify our
> : >> : : + // indexing code,
> : >> : : + // based on the fact that our indexing is based on a
> PULL-node client.
> : >> : : + final long pullServiceTimeMs = 1000 + (long)
> r.nextInt(9000);
> : >> : : + Future<?> jettyManipulationFuture =
> : >> : : + executor.submit(
> : >> : : + () -> {
> : >> : : + // we manipulate the jetty instances in a
> separate thread to more closely mimic
> : >> : : + // the behavior we'd
> : >> : : + // see irl.
> : >> : : + try {
> : >> : : + Thread.sleep(establishBaselineMs);
> : >> : : + log.info("stopping NRT jetty ...");
> : >> : : + nrtJettyF.stop();
> : >> : : + log.info("NRT jetty stopped.");
> : >> : : + Thread.sleep(nrtDowntimeMs); // let NRT be
> down for a while
> : >> : : + log.info("restarting NRT jetty ...");
> : >> : : + nrtJettyF.start(true);
> : >> : : + log.info("NRT jetty restarted.");
> : >> : : + // once NRT is back up, we expect PULL to
> continue serving until the TTL on ZK
> : >> : : + // state
> : >> : : + // used for query request routing has
> expired (60s). But here we force a return
> : >> : : + // to NRT
> : >> : : + // by stopping the PULL replica after a
> brief delay ...
> : >> : : + Thread.sleep(pullServiceTimeMs);
> : >> : : + log.info("stopping PULL jetty ...");
> : >> : : + pullJettyF.stop();
> : >> : : + log.info("PULL jetty stopped.");
> : >> : : + } catch (Exception e) {
> : >> : : + throw new RuntimeException(e);
> : >> : : + }
> : >> : : + });
> : >> : : + String hostCore;
> : >> : : + long start = new Date().getTime();
> : >> : : + long individualRequestStart = start;
> : >> : : + int count = 0;
> : >> : : + while (nrtCore.equals(
> : >> : : + hostCore =
> : >> : : + getHostCoreName(
> : >> : : + COLL,
> : >> : : + qaJettyBase,
> : >> : : + client,
> : >> : : + p -> p.add("shards.preference",
> "replica.type:NRT")))) {
> : >> : : + count++;
> : >> : : + individualRequestStart = new Date().getTime();
> : >> : : + }
> : >> : : + long now = new Date().getTime();
> : >> : : + log.info(
> : >> : : + "phase1 NRT queries count={}, overall_duration={},
> baseline_expected_overall_duration={}, switch-to-pull_duration={}",
> : >> : : + count,
> : >> : : + now - start,
> : >> : : + establishBaselineMs,
> : >> : : + now - individualRequestStart);
> : >> : : + // default tolerance of 500ms below should suffice.
> Failover to PULL for this case should be
> : >> : : + // very fast,
> : >> : : + // because our QA-based client already knows both
> replicas are active, the index is stable,
> : >> : : + // so the moment
> : >> : : + // the client finds NRT is down it should be able to
> failover immediately and transparently
> : >> : : + // to PULL.
> : >> : : + assertEquals(
> : >> : : + "when we break out of the NRT query loop, should be
> b/c routed to PULL",
> : >> : : + pullCore,
> : >> : : + hostCore);
> : >> : : + SolrInputDocument d = new SolrInputDocument();
> : >> : : + d.addField("id", "345");
> : >> : : + d.addField("desc_s", "Another Document");
> : >> : : + // attempts to add another doc while NRT is down should
> fail, then eventually succeed when
> : >> : : + // NRT comes back up
> : >> : : + count = 0;
> : >> : : + start = new Date().getTime();
> : >> : : + individualRequestStart = start;
> : >> : : + for (; ; ) {
> : >> : : + try {
> : >> : : + client.add(COLL, d);
> : >> : : + client.commit(COLL);
> : >> : : + break;
> : >> : : + } catch (SolrException ex) {
> : >> : : + // we expect these until nrtJetty is back up.
> : >> : : + count++;
> : >> : : + Thread.sleep(100);
> : >> : : + }
> : >> : : + individualRequestStart = new Date().getTime();
> : >> : : + }
> : >> : : + now = new Date().getTime();
> : >> : : + log.info(
> : >> : : + "successfully added another doc; duration: {},
> overall_duration={}, baseline_expected_overall_duration={},
> exception_count={}",
> : >> : : + now - individualRequestStart,
> : >> : : + now - start,
> : >> : : + nrtDowntimeMs,
> : >> : : + count);
> : >> : : + // NRT replica is back up, registered as available with
> Zk, and availability info has been
> : >> : : + // pulled down by
> : >> : : + // our PULL-replica-based `client`, forwarded indexing
> command to NRT, index/commit
> : >> : : + // completed. All of this
> : >> : : + // accounts for the 3000ms tolerance allowed for below.
> This is not a strict value, and if
> : >> : : + // it causes failures
> : >> : : + // regularly we should feel free to increase the
> tolerance; but it's meant to provide a
> : >> : : + // stable baseline from
> : >> : : + // which to detect regressions.
> : >> : : + count = 0;
> : >> : : + start = new Date().getTime();
> : >> : : + individualRequestStart = start;
> : >> : : + while (pullCore.equals(
> : >> : : + hostCore =
> : >> : : + getHostCoreName(
> : >> : : + COLL,
> : >> : : + qaJettyBase,
> : >> : : + client,
> : >> : : + p -> {
> : >> : : + p.set(CommonParams.Q, "id:345");
> : >> : : + p.add("shards.preference",
> "replica.type:NRT");
> : >> : : + }))) {
> : >> : : + count++;
> : >> : : + Thread.sleep(100);
> : >> : : + individualRequestStart = new Date().getTime();
> : >> : : + }
> : >> : : + now = new Date().getTime();
> : >> : : + log.info(
> : >> : : + "query retries between NRT index-ready and
> query-ready: {}; overall_duration={};
> baseline_expected_overall_duration={}; failover-request_duration={}",
> : >> : : + count,
> : >> : : + now - start,
> : >> : : + pullServiceTimeMs,
> : >> : : + now - individualRequestStart);
> : >> : : + assertEquals(nrtCore, hostCore);
> : >> : : + // allow any exceptions to propagate
> : >> : : + jettyManipulationFuture.get();
> : >> : : + if (true) return;
> : >> : : +
> : >> : : + // next phase: just toggle a bunch
> : >> : : + // TODO: could separate this out into a different test
> method, but this should suffice for
> : >> : : + // now
> : >> : : + pullJetty.start(true);
> : >> : : + AtomicBoolean done = new AtomicBoolean();
> : >> : : + long runMinutes = 1;
> : >> : : + long finishTimeMs =
> : >> : : + new Date().getTime() +
> TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
> : >> : : + JettySolrRunner[] jettys = new JettySolrRunner[]
> {nrtJettyF, pullJettyF};
> : >> : : + Random threadRandom = new Random(r.nextInt());
> : >> : : + Future<Integer> f =
> : >> : : + executor.submit(
> : >> : : + () -> {
> : >> : : + int iteration = 0;
> : >> : : + while (new Date().getTime() < finishTimeMs &&
> !done.get()) {
> : >> : : + int idx = iteration++ % jettys.length;
> : >> : : + JettySolrRunner toManipulate = jettys[idx];
> : >> : : + try {
> : >> : : + int serveTogetherTime =
> threadRandom.nextInt(7000);
> : >> : : + int downTime = threadRandom.nextInt(7000);
> : >> : : + log.info("serving together for {}ms",
> serveTogetherTime);
> : >> : : + Thread.sleep(serveTogetherTime);
> : >> : : + log.info("stopping {} ...", idx);
> : >> : : + toManipulate.stop();
> : >> : : + log.info("stopped {}.", idx);
> : >> : : + Thread.sleep(downTime);
> : >> : : + log.info("restarting {} ...", idx);
> : >> : : + toManipulate.start(true);
> : >> : : + log.info("restarted {}.", idx);
> : >> : : + } catch (Exception e) {
> : >> : : + throw new RuntimeException(e);
> : >> : : + }
> : >> : : + }
> : >> : : + done.set(true);
> : >> : : + return iteration;
> : >> : : + });
> : >> : : + count = 0;
> : >> : : + start = new Date().getTime();
> : >> : : + try {
> : >> : : + do {
> : >> : : + pullCore.equals(
> : >> : : + hostCore =
> : >> : : + getHostCoreName(
> : >> : : + COLL,
> : >> : : + qaJettyBase,
> : >> : : + client,
> : >> : : + p -> {
> : >> : : + p.set(CommonParams.Q, "id:345");
> : >> : : + p.add("shards.preference",
> "replica.type:NRT");
> : >> : : + }));
> : >> : : + count++;
> : >> : : + Thread.sleep(100);
> : >> : : + } while (!done.get());
> : >> : : + } finally {
> : >> : : + final String result;
> : >> : : + if (done.getAndSet(true)) {
> : >> : : + result = "Success";
> : >> : : + } else {
> : >> : : + // not yet set to done, completed abnormally
> (exception will be thrown beyond `finally`
> : >> : : + // block)
> : >> : : + result = "Failure";
> : >> : : + }
> : >> : : + Integer toggleCount = f.get();
> : >> : : + long secondsDuration =
> : >> : : + TimeUnit.SECONDS.convert(new Date().getTime() -
> start, TimeUnit.MILLISECONDS);
> : >> : : + log.info(
> : >> : : + "{}! {} seconds, {} toggles, {} requests served",
> : >> : : + result,
> : >> : : + secondsDuration,
> : >> : : + toggleCount,
> : >> : : + count);
> : >> : : + }
> : >> : : + }
> : >> : : } finally {
> : >> : : - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : >> : : + try {
> : >> : : + ExecutorUtil.shutdownAndAwaitTermination(executor);
> : >> : : + } finally {
> : >> : : + cluster.shutdown();
> : >> : : + }
> : >> : : }
> : >> : : - QueryResponse rslt =
> : >> : : - new QueryRequest(new SolrQuery("*:*"))
> : >> : : -
> .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
> : >> : : - .process(client, COLLECTION_NAME);
> : >> : : -
> : >> : : - assertEquals(10, rslt.getResults().size());
> : >> : : + }
> : >> : :
> : >> : : - DocCollection collection =
> : >> : : -
> cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
> : >> : : - assertNotNull(collection);
> : >> : : + @SuppressWarnings("rawtypes")
> : >> : : + private String getHostCoreName(
> : >> : : + String COLL, String qaNode, HttpSolrClient solrClient,
> Consumer<SolrQuery> p)
> : >> : : + throws Exception {
> : >> : :
> : >> : : - Set<String> expectedNodes = new HashSet<>();
> : >> : : - expectedNodes.add(coordinatorJetty.getNodeName());
> : >> : : - collection.forEachReplica((s, replica) ->
> expectedNodes.remove(replica.getNodeName()));
> : >> : : - assertTrue(expectedNodes.isEmpty());
> : >> : : + boolean found = false;
> : >> : : + SolrQuery q = new SolrQuery("*:*");
> : >> : : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER,
> TRUE);
> : >> : : + p.accept(q);
> : >> : : + StringBuilder sb =
> : >> : : + new
> StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
> : >> : : + q.forEach(e ->
> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
> : >> : : + SolrDocumentList docs = null;
> : >> : : + for (int i = 0; i < 100; i++) {
> : >> : : + try {
> : >> : : + SimpleOrderedMap rsp =
> : >> : : + (SimpleOrderedMap)
> : >> : : + Utils.executeGET(solrClient.getHttpClient(),
> sb.toString(), Utils.JAVABINCONSUMER);
> : >> : : + docs = (SolrDocumentList) rsp.get("response");
> : >> : : + if (docs.size() > 0) {
> : >> : : + found = true;
> : >> : : + break;
> : >> : : + }
> : >> : : + } catch (SolrException ex) {
> : >> : : + // we know we're doing tricky things that might cause
> transient errors
> : >> : : + // TODO: all these query requests go to the QA node --
> should QA propagate internal request
> : >> : : + // errors
> : >> : : + // to the external client (and the external client
> retry?) or should QA attempt to failover
> : >> : : + // transparently
> : >> : : + // in the event of an error?
> : >> : : + if (i < 5) {
> : >> : : + log.info("swallowing transient error", ex);
> : >> : : + } else {
> : >> : : + log.error("only expect actual _errors_ within a small
> window (e.g. 500ms)", ex);
> : >> : : + fail("initial error time threshold exceeded");
> : >> : : + }
> : >> : : + }
> : >> : : + Thread.sleep(100);
> : >> : : + }
> : >> : : + assertTrue(found);
> : >> : : + return (String) docs.get(0).getFieldValue("_core_");
> : >> : : }
> : >> : : }
> : >> : :
> : >> : :
> : >> :
> : >> : -Hoss
> : >> : http://www.lucidworks.com/
> : >> :
> : >>
> : >> -Hoss
> : >> http://www.lucidworks.com/
> : >>
> : >> ---------------------------------------------------------------------
> : >> To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
> : >> For additional commands, e-mail: dev-help@solr.apache.org
> : >>
> :
> : ---------------------------------------------------------------------
> : To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
> : For additional commands, e-mail: dev-help@solr.apache.org
> :
> :
>
> -Hoss
> http://www.lucidworks.com/
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
> For additional commands, e-mail: dev-help@solr.apache.org
>
>
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Chris Hostetter <ho...@fucit.org>.
:
: Done, fixed. Thanks!
If you understand what this test is doing, can you please also look into
the sporadic failures on main ?
:
: On Sat, Jan 21, 2023 at 12:21 PM Ishan Chattopadhyaya
: <ic...@gmail.com> wrote:
: >
: > I'll take a look, Hoss.
: >
: > On Sat, 21 Jan, 2023, 2:37 am Chris Hostetter, <ho...@fucit.org> wrote:
: >>
: >>
: >> Created jia & AwaitsFix'ed the test ...
: >>
: >> https://issues.apache.org/jira/browse/SOLR-16630
: >>
: >>
: >>
: >> : Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
: >> : From: Chris Hostetter <ho...@fucit.org>
: >> : To: dev@solr.apache.org
: >> : Cc: "commits@solr.apache.org" <co...@solr.apache.org>
: >> : Subject: Re: [solr] branch branch_9x updated: test case added for coordinator
: >> : role
: >> :
: >> :
: >> : Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a
: >> : ridiculous number of times since you added it a week ago.
: >> :
: >> : IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main
: >> : builds)
: >> :
: >> : -Hoss
: >> :
: >> :
: >> : : Date: Thu, 12 Jan 2023 07:54:33 +0000
: >> : : From: noble@apache.org
: >> : : Reply-To: dev@solr.apache.org
: >> : : To: "commits@solr.apache.org" <co...@solr.apache.org>
: >> : : Subject: [solr] branch branch_9x updated: test case added for coordinator role
: >> : :
: >> : : This is an automated email from the ASF dual-hosted git repository.
: >> : :
: >> : : noble pushed a commit to branch branch_9x
: >> : : in repository https://gitbox.apache.org/repos/asf/solr.git
: >> : :
: >> : :
: >> : : The following commit(s) were added to refs/heads/branch_9x by this push:
: >> : : new ec9b152c31f test case added for coordinator role
: >> : : ec9b152c31f is described below
: >> : :
: >> : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
: >> : : Author: Noble Paul <no...@gmail.com>
: >> : : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
: >> : :
: >> : : test case added for coordinator role
: >> : : ---
: >> : : .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++--
: >> : : 1 file changed, 375 insertions(+), 37 deletions(-)
: >> : :
: >> : : diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: >> : : index 5e2dcfb70a8..6c4e845cf5a 100644
: >> : : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: >> : : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: >> : : @@ -17,69 +17,407 @@
: >> : :
: >> : : package org.apache.solr.search;
: >> : :
: >> : : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
: >> : : +import static org.apache.solr.common.params.CommonParams.TRUE;
: >> : : +
: >> : : +import java.lang.invoke.MethodHandles;
: >> : : +import java.util.Date;
: >> : : +import java.util.EnumSet;
: >> : : import java.util.HashSet;
: >> : : import java.util.List;
: >> : : +import java.util.Random;
: >> : : import java.util.Set;
: >> : : +import java.util.concurrent.ExecutorService;
: >> : : +import java.util.concurrent.Future;
: >> : : +import java.util.concurrent.TimeUnit;
: >> : : +import java.util.concurrent.atomic.AtomicBoolean;
: >> : : +import java.util.function.Consumer;
: >> : : import org.apache.solr.client.solrj.SolrQuery;
: >> : : import org.apache.solr.client.solrj.impl.CloudSolrClient;
: >> : : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
: >> : : import org.apache.solr.client.solrj.request.CollectionAdminRequest;
: >> : : import org.apache.solr.client.solrj.request.QueryRequest;
: >> : : import org.apache.solr.client.solrj.request.UpdateRequest;
: >> : : import org.apache.solr.client.solrj.response.QueryResponse;
: >> : : +import org.apache.solr.cloud.MiniSolrCloudCluster;
: >> : : import org.apache.solr.cloud.SolrCloudTestCase;
: >> : : +import org.apache.solr.common.SolrDocumentList;
: >> : : +import org.apache.solr.common.SolrException;
: >> : : import org.apache.solr.common.SolrInputDocument;
: >> : : import org.apache.solr.common.cloud.DocCollection;
: >> : : +import org.apache.solr.common.cloud.Replica;
: >> : : +import org.apache.solr.common.params.CommonParams;
: >> : : +import org.apache.solr.common.util.ExecutorUtil;
: >> : : +import org.apache.solr.common.util.SimpleOrderedMap;
: >> : : +import org.apache.solr.common.util.SolrNamedThreadFactory;
: >> : : +import org.apache.solr.common.util.Utils;
: >> : : import org.apache.solr.core.NodeRoles;
: >> : : import org.apache.solr.embedded.JettySolrRunner;
: >> : : import org.apache.solr.servlet.CoordinatorHttpSolrCall;
: >> : : -import org.junit.BeforeClass;
: >> : : +import org.slf4j.Logger;
: >> : : +import org.slf4j.LoggerFactory;
: >> : :
: >> : : public class TestCoordinatorRole extends SolrCloudTestCase {
: >> : : -
: >> : : - @BeforeClass
: >> : : - public static void setupCluster() throws Exception {
: >> : : - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: >> : : - }
: >> : : + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
: >> : :
: >> : : public void testSimple() throws Exception {
: >> : : - CloudSolrClient client = cluster.getSolrClient();
: >> : : - String COLLECTION_NAME = "test_coll";
: >> : : - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: >> : : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: >> : : - .process(cluster.getSolrClient());
: >> : : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: >> : : - UpdateRequest ur = new UpdateRequest();
: >> : : - for (int i = 0; i < 10; i++) {
: >> : : - SolrInputDocument doc2 = new SolrInputDocument();
: >> : : - doc2.addField("id", "" + i);
: >> : : - ur.add(doc2);
: >> : : - }
: >> : : + MiniSolrCloudCluster cluster =
: >> : : + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: >> : : + try {
: >> : : + CloudSolrClient client = cluster.getSolrClient();
: >> : : + String COLLECTION_NAME = "test_coll";
: >> : : + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: >> : : + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: >> : : + .process(cluster.getSolrClient());
: >> : : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: >> : : + UpdateRequest ur = new UpdateRequest();
: >> : : + for (int i = 0; i < 10; i++) {
: >> : : + SolrInputDocument doc2 = new SolrInputDocument();
: >> : : + doc2.addField("id", "" + i);
: >> : : + ur.add(doc2);
: >> : : + }
: >> : :
: >> : : - ur.commit(client, COLLECTION_NAME);
: >> : : - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: >> : : - assertEquals(10, rsp.getResults().getNumFound());
: >> : : + ur.commit(client, COLLECTION_NAME);
: >> : : + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: >> : : + assertEquals(10, rsp.getResults().getNumFound());
: >> : :
: >> : : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: >> : : + JettySolrRunner coordinatorJetty = null;
: >> : : + try {
: >> : : + coordinatorJetty = cluster.startJettySolrRunner();
: >> : : + } finally {
: >> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: >> : : + }
: >> : : + QueryResponse rslt =
: >> : : + new QueryRequest(new SolrQuery("*:*"))
: >> : : + .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: >> : : + .process(client, COLLECTION_NAME);
: >> : : +
: >> : : + assertEquals(10, rslt.getResults().size());
: >> : : +
: >> : : + DocCollection collection =
: >> : : + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: >> : : + assertNotNull(collection);
: >> : : +
: >> : : + Set<String> expectedNodes = new HashSet<>();
: >> : : + expectedNodes.add(coordinatorJetty.getNodeName());
: >> : : + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: >> : : + assertTrue(expectedNodes.isEmpty());
: >> : : + } finally {
: >> : : + cluster.shutdown();
: >> : : + }
: >> : : + }
: >> : : +
: >> : : + public void testNRTRestart() throws Exception {
: >> : : + // we restart jetty and expect to find on disk data - need a local fs directory
: >> : : + useFactory(null);
: >> : : + String COLL = "coordinator_test_coll";
: >> : : + MiniSolrCloudCluster cluster =
: >> : : + configureCluster(3)
: >> : : + .withJettyConfig(jetty -> jetty.enableV2(true))
: >> : : + .addConfig("conf", configset("conf2"))
: >> : : + .configure();
: >> : : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: >> : : - JettySolrRunner coordinatorJetty = null;
: >> : : + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
: >> : : + String qaJettyBase = qaJetty.getBaseUrl().toString();
: >> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: >> : : + ExecutorService executor =
: >> : : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
: >> : : try {
: >> : : - coordinatorJetty = cluster.startJettySolrRunner();
: >> : : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
: >> : : + .process(cluster.getSolrClient());
: >> : : + cluster.waitForActiveCollection(COLL, 1, 2);
: >> : : + DocCollection docColl =
: >> : : + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
: >> : : + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
: >> : : + assertNotNull(nrtReplica);
: >> : : + String nrtCore = nrtReplica.getCoreName();
: >> : : + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
: >> : : + assertNotNull(pullReplica);
: >> : : + String pullCore = pullReplica.getCoreName();
: >> : : +
: >> : : + SolrInputDocument sid = new SolrInputDocument();
: >> : : + sid.addField("id", "123");
: >> : : + sid.addField("desc_s", "A Document");
: >> : : + JettySolrRunner nrtJetty = null;
: >> : : + JettySolrRunner pullJetty = null;
: >> : : + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
: >> : : + String nodeName = j.getNodeName();
: >> : : + if (nodeName.equals(nrtReplica.getNodeName())) {
: >> : : + nrtJetty = j;
: >> : : + } else if (nodeName.equals(pullReplica.getNodeName())) {
: >> : : + pullJetty = j;
: >> : : + }
: >> : : + }
: >> : : + assertNotNull(nrtJetty);
: >> : : + assertNotNull(pullJetty);
: >> : : + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
: >> : : + client.add(COLL, sid);
: >> : : + client.commit(COLL);
: >> : : + assertEquals(
: >> : : + nrtCore,
: >> : : + getHostCoreName(
: >> : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
: >> : : + assertEquals(
: >> : : + pullCore,
: >> : : + getHostCoreName(
: >> : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
: >> : : + // Now , kill NRT jetty
: >> : : + JettySolrRunner nrtJettyF = nrtJetty;
: >> : : + JettySolrRunner pullJettyF = pullJetty;
: >> : : + Random r = random();
: >> : : + final long establishBaselineMs = r.nextInt(1000);
: >> : : + final long nrtDowntimeMs = r.nextInt(10000);
: >> : : + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
: >> : : + // indexing code,
: >> : : + // based on the fact that our indexing is based on a PULL-node client.
: >> : : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
: >> : : + Future<?> jettyManipulationFuture =
: >> : : + executor.submit(
: >> : : + () -> {
: >> : : + // we manipulate the jetty instances in a separate thread to more closely mimic
: >> : : + // the behavior we'd
: >> : : + // see irl.
: >> : : + try {
: >> : : + Thread.sleep(establishBaselineMs);
: >> : : + log.info("stopping NRT jetty ...");
: >> : : + nrtJettyF.stop();
: >> : : + log.info("NRT jetty stopped.");
: >> : : + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
: >> : : + log.info("restarting NRT jetty ...");
: >> : : + nrtJettyF.start(true);
: >> : : + log.info("NRT jetty restarted.");
: >> : : + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
: >> : : + // state
: >> : : + // used for query request routing has expired (60s). But here we force a return
: >> : : + // to NRT
: >> : : + // by stopping the PULL replica after a brief delay ...
: >> : : + Thread.sleep(pullServiceTimeMs);
: >> : : + log.info("stopping PULL jetty ...");
: >> : : + pullJettyF.stop();
: >> : : + log.info("PULL jetty stopped.");
: >> : : + } catch (Exception e) {
: >> : : + throw new RuntimeException(e);
: >> : : + }
: >> : : + });
: >> : : + String hostCore;
: >> : : + long start = new Date().getTime();
: >> : : + long individualRequestStart = start;
: >> : : + int count = 0;
: >> : : + while (nrtCore.equals(
: >> : : + hostCore =
: >> : : + getHostCoreName(
: >> : : + COLL,
: >> : : + qaJettyBase,
: >> : : + client,
: >> : : + p -> p.add("shards.preference", "replica.type:NRT")))) {
: >> : : + count++;
: >> : : + individualRequestStart = new Date().getTime();
: >> : : + }
: >> : : + long now = new Date().getTime();
: >> : : + log.info(
: >> : : + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
: >> : : + count,
: >> : : + now - start,
: >> : : + establishBaselineMs,
: >> : : + now - individualRequestStart);
: >> : : + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
: >> : : + // very fast,
: >> : : + // because our QA-based client already knows both replicas are active, the index is stable,
: >> : : + // so the moment
: >> : : + // the client finds NRT is down it should be able to failover immediately and transparently
: >> : : + // to PULL.
: >> : : + assertEquals(
: >> : : + "when we break out of the NRT query loop, should be b/c routed to PULL",
: >> : : + pullCore,
: >> : : + hostCore);
: >> : : + SolrInputDocument d = new SolrInputDocument();
: >> : : + d.addField("id", "345");
: >> : : + d.addField("desc_s", "Another Document");
: >> : : + // attempts to add another doc while NRT is down should fail, then eventually succeed when
: >> : : + // NRT comes back up
: >> : : + count = 0;
: >> : : + start = new Date().getTime();
: >> : : + individualRequestStart = start;
: >> : : + for (; ; ) {
: >> : : + try {
: >> : : + client.add(COLL, d);
: >> : : + client.commit(COLL);
: >> : : + break;
: >> : : + } catch (SolrException ex) {
: >> : : + // we expect these until nrtJetty is back up.
: >> : : + count++;
: >> : : + Thread.sleep(100);
: >> : : + }
: >> : : + individualRequestStart = new Date().getTime();
: >> : : + }
: >> : : + now = new Date().getTime();
: >> : : + log.info(
: >> : : + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
: >> : : + now - individualRequestStart,
: >> : : + now - start,
: >> : : + nrtDowntimeMs,
: >> : : + count);
: >> : : + // NRT replica is back up, registered as available with Zk, and availability info has been
: >> : : + // pulled down by
: >> : : + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
: >> : : + // completed. All of this
: >> : : + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
: >> : : + // it causes failures
: >> : : + // regularly we should feel free to increase the tolerance; but it's meant to provide a
: >> : : + // stable baseline from
: >> : : + // which to detect regressions.
: >> : : + count = 0;
: >> : : + start = new Date().getTime();
: >> : : + individualRequestStart = start;
: >> : : + while (pullCore.equals(
: >> : : + hostCore =
: >> : : + getHostCoreName(
: >> : : + COLL,
: >> : : + qaJettyBase,
: >> : : + client,
: >> : : + p -> {
: >> : : + p.set(CommonParams.Q, "id:345");
: >> : : + p.add("shards.preference", "replica.type:NRT");
: >> : : + }))) {
: >> : : + count++;
: >> : : + Thread.sleep(100);
: >> : : + individualRequestStart = new Date().getTime();
: >> : : + }
: >> : : + now = new Date().getTime();
: >> : : + log.info(
: >> : : + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
: >> : : + count,
: >> : : + now - start,
: >> : : + pullServiceTimeMs,
: >> : : + now - individualRequestStart);
: >> : : + assertEquals(nrtCore, hostCore);
: >> : : + // allow any exceptions to propagate
: >> : : + jettyManipulationFuture.get();
: >> : : + if (true) return;
: >> : : +
: >> : : + // next phase: just toggle a bunch
: >> : : + // TODO: could separate this out into a different test method, but this should suffice for
: >> : : + // now
: >> : : + pullJetty.start(true);
: >> : : + AtomicBoolean done = new AtomicBoolean();
: >> : : + long runMinutes = 1;
: >> : : + long finishTimeMs =
: >> : : + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
: >> : : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
: >> : : + Random threadRandom = new Random(r.nextInt());
: >> : : + Future<Integer> f =
: >> : : + executor.submit(
: >> : : + () -> {
: >> : : + int iteration = 0;
: >> : : + while (new Date().getTime() < finishTimeMs && !done.get()) {
: >> : : + int idx = iteration++ % jettys.length;
: >> : : + JettySolrRunner toManipulate = jettys[idx];
: >> : : + try {
: >> : : + int serveTogetherTime = threadRandom.nextInt(7000);
: >> : : + int downTime = threadRandom.nextInt(7000);
: >> : : + log.info("serving together for {}ms", serveTogetherTime);
: >> : : + Thread.sleep(serveTogetherTime);
: >> : : + log.info("stopping {} ...", idx);
: >> : : + toManipulate.stop();
: >> : : + log.info("stopped {}.", idx);
: >> : : + Thread.sleep(downTime);
: >> : : + log.info("restarting {} ...", idx);
: >> : : + toManipulate.start(true);
: >> : : + log.info("restarted {}.", idx);
: >> : : + } catch (Exception e) {
: >> : : + throw new RuntimeException(e);
: >> : : + }
: >> : : + }
: >> : : + done.set(true);
: >> : : + return iteration;
: >> : : + });
: >> : : + count = 0;
: >> : : + start = new Date().getTime();
: >> : : + try {
: >> : : + do {
: >> : : + pullCore.equals(
: >> : : + hostCore =
: >> : : + getHostCoreName(
: >> : : + COLL,
: >> : : + qaJettyBase,
: >> : : + client,
: >> : : + p -> {
: >> : : + p.set(CommonParams.Q, "id:345");
: >> : : + p.add("shards.preference", "replica.type:NRT");
: >> : : + }));
: >> : : + count++;
: >> : : + Thread.sleep(100);
: >> : : + } while (!done.get());
: >> : : + } finally {
: >> : : + final String result;
: >> : : + if (done.getAndSet(true)) {
: >> : : + result = "Success";
: >> : : + } else {
: >> : : + // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
: >> : : + // block)
: >> : : + result = "Failure";
: >> : : + }
: >> : : + Integer toggleCount = f.get();
: >> : : + long secondsDuration =
: >> : : + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
: >> : : + log.info(
: >> : : + "{}! {} seconds, {} toggles, {} requests served",
: >> : : + result,
: >> : : + secondsDuration,
: >> : : + toggleCount,
: >> : : + count);
: >> : : + }
: >> : : + }
: >> : : } finally {
: >> : : - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: >> : : + try {
: >> : : + ExecutorUtil.shutdownAndAwaitTermination(executor);
: >> : : + } finally {
: >> : : + cluster.shutdown();
: >> : : + }
: >> : : }
: >> : : - QueryResponse rslt =
: >> : : - new QueryRequest(new SolrQuery("*:*"))
: >> : : - .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: >> : : - .process(client, COLLECTION_NAME);
: >> : : -
: >> : : - assertEquals(10, rslt.getResults().size());
: >> : : + }
: >> : :
: >> : : - DocCollection collection =
: >> : : - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: >> : : - assertNotNull(collection);
: >> : : + @SuppressWarnings("rawtypes")
: >> : : + private String getHostCoreName(
: >> : : + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
: >> : : + throws Exception {
: >> : :
: >> : : - Set<String> expectedNodes = new HashSet<>();
: >> : : - expectedNodes.add(coordinatorJetty.getNodeName());
: >> : : - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: >> : : - assertTrue(expectedNodes.isEmpty());
: >> : : + boolean found = false;
: >> : : + SolrQuery q = new SolrQuery("*:*");
: >> : : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
: >> : : + p.accept(q);
: >> : : + StringBuilder sb =
: >> : : + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
: >> : : + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
: >> : : + SolrDocumentList docs = null;
: >> : : + for (int i = 0; i < 100; i++) {
: >> : : + try {
: >> : : + SimpleOrderedMap rsp =
: >> : : + (SimpleOrderedMap)
: >> : : + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
: >> : : + docs = (SolrDocumentList) rsp.get("response");
: >> : : + if (docs.size() > 0) {
: >> : : + found = true;
: >> : : + break;
: >> : : + }
: >> : : + } catch (SolrException ex) {
: >> : : + // we know we're doing tricky things that might cause transient errors
: >> : : + // TODO: all these query requests go to the QA node -- should QA propagate internal request
: >> : : + // errors
: >> : : + // to the external client (and the external client retry?) or should QA attempt to failover
: >> : : + // transparently
: >> : : + // in the event of an error?
: >> : : + if (i < 5) {
: >> : : + log.info("swallowing transient error", ex);
: >> : : + } else {
: >> : : + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
: >> : : + fail("initial error time threshold exceeded");
: >> : : + }
: >> : : + }
: >> : : + Thread.sleep(100);
: >> : : + }
: >> : : + assertTrue(found);
: >> : : + return (String) docs.get(0).getFieldValue("_core_");
: >> : : }
: >> : : }
: >> : :
: >> : :
: >> :
: >> : -Hoss
: >> : http://www.lucidworks.com/
: >> :
: >>
: >> -Hoss
: >> http://www.lucidworks.com/
: >>
: >> ---------------------------------------------------------------------
: >> To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
: >> For additional commands, e-mail: dev-help@solr.apache.org
: >>
:
: ---------------------------------------------------------------------
: To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
: For additional commands, e-mail: dev-help@solr.apache.org
:
:
-Hoss
http://www.lucidworks.com/
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
For additional commands, e-mail: dev-help@solr.apache.org
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Ishan Chattopadhyaya <ic...@gmail.com>.
Done, fixed. Thanks!
On Sat, Jan 21, 2023 at 12:21 PM Ishan Chattopadhyaya
<ic...@gmail.com> wrote:
>
> I'll take a look, Hoss.
>
> On Sat, 21 Jan, 2023, 2:37 am Chris Hostetter, <ho...@fucit.org> wrote:
>>
>>
>> Created jia & AwaitsFix'ed the test ...
>>
>> https://issues.apache.org/jira/browse/SOLR-16630
>>
>>
>>
>> : Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
>> : From: Chris Hostetter <ho...@fucit.org>
>> : To: dev@solr.apache.org
>> : Cc: "commits@solr.apache.org" <co...@solr.apache.org>
>> : Subject: Re: [solr] branch branch_9x updated: test case added for coordinator
>> : role
>> :
>> :
>> : Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a
>> : ridiculous number of times since you added it a week ago.
>> :
>> : IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main
>> : builds)
>> :
>> : -Hoss
>> :
>> :
>> : : Date: Thu, 12 Jan 2023 07:54:33 +0000
>> : : From: noble@apache.org
>> : : Reply-To: dev@solr.apache.org
>> : : To: "commits@solr.apache.org" <co...@solr.apache.org>
>> : : Subject: [solr] branch branch_9x updated: test case added for coordinator role
>> : :
>> : : This is an automated email from the ASF dual-hosted git repository.
>> : :
>> : : noble pushed a commit to branch branch_9x
>> : : in repository https://gitbox.apache.org/repos/asf/solr.git
>> : :
>> : :
>> : : The following commit(s) were added to refs/heads/branch_9x by this push:
>> : : new ec9b152c31f test case added for coordinator role
>> : : ec9b152c31f is described below
>> : :
>> : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
>> : : Author: Noble Paul <no...@gmail.com>
>> : : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
>> : :
>> : : test case added for coordinator role
>> : : ---
>> : : .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++--
>> : : 1 file changed, 375 insertions(+), 37 deletions(-)
>> : :
>> : : diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
>> : : index 5e2dcfb70a8..6c4e845cf5a 100644
>> : : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
>> : : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
>> : : @@ -17,69 +17,407 @@
>> : :
>> : : package org.apache.solr.search;
>> : :
>> : : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
>> : : +import static org.apache.solr.common.params.CommonParams.TRUE;
>> : : +
>> : : +import java.lang.invoke.MethodHandles;
>> : : +import java.util.Date;
>> : : +import java.util.EnumSet;
>> : : import java.util.HashSet;
>> : : import java.util.List;
>> : : +import java.util.Random;
>> : : import java.util.Set;
>> : : +import java.util.concurrent.ExecutorService;
>> : : +import java.util.concurrent.Future;
>> : : +import java.util.concurrent.TimeUnit;
>> : : +import java.util.concurrent.atomic.AtomicBoolean;
>> : : +import java.util.function.Consumer;
>> : : import org.apache.solr.client.solrj.SolrQuery;
>> : : import org.apache.solr.client.solrj.impl.CloudSolrClient;
>> : : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
>> : : import org.apache.solr.client.solrj.request.CollectionAdminRequest;
>> : : import org.apache.solr.client.solrj.request.QueryRequest;
>> : : import org.apache.solr.client.solrj.request.UpdateRequest;
>> : : import org.apache.solr.client.solrj.response.QueryResponse;
>> : : +import org.apache.solr.cloud.MiniSolrCloudCluster;
>> : : import org.apache.solr.cloud.SolrCloudTestCase;
>> : : +import org.apache.solr.common.SolrDocumentList;
>> : : +import org.apache.solr.common.SolrException;
>> : : import org.apache.solr.common.SolrInputDocument;
>> : : import org.apache.solr.common.cloud.DocCollection;
>> : : +import org.apache.solr.common.cloud.Replica;
>> : : +import org.apache.solr.common.params.CommonParams;
>> : : +import org.apache.solr.common.util.ExecutorUtil;
>> : : +import org.apache.solr.common.util.SimpleOrderedMap;
>> : : +import org.apache.solr.common.util.SolrNamedThreadFactory;
>> : : +import org.apache.solr.common.util.Utils;
>> : : import org.apache.solr.core.NodeRoles;
>> : : import org.apache.solr.embedded.JettySolrRunner;
>> : : import org.apache.solr.servlet.CoordinatorHttpSolrCall;
>> : : -import org.junit.BeforeClass;
>> : : +import org.slf4j.Logger;
>> : : +import org.slf4j.LoggerFactory;
>> : :
>> : : public class TestCoordinatorRole extends SolrCloudTestCase {
>> : : -
>> : : - @BeforeClass
>> : : - public static void setupCluster() throws Exception {
>> : : - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
>> : : - }
>> : : + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>> : :
>> : : public void testSimple() throws Exception {
>> : : - CloudSolrClient client = cluster.getSolrClient();
>> : : - String COLLECTION_NAME = "test_coll";
>> : : - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
>> : : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
>> : : - .process(cluster.getSolrClient());
>> : : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
>> : : - UpdateRequest ur = new UpdateRequest();
>> : : - for (int i = 0; i < 10; i++) {
>> : : - SolrInputDocument doc2 = new SolrInputDocument();
>> : : - doc2.addField("id", "" + i);
>> : : - ur.add(doc2);
>> : : - }
>> : : + MiniSolrCloudCluster cluster =
>> : : + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
>> : : + try {
>> : : + CloudSolrClient client = cluster.getSolrClient();
>> : : + String COLLECTION_NAME = "test_coll";
>> : : + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
>> : : + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
>> : : + .process(cluster.getSolrClient());
>> : : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
>> : : + UpdateRequest ur = new UpdateRequest();
>> : : + for (int i = 0; i < 10; i++) {
>> : : + SolrInputDocument doc2 = new SolrInputDocument();
>> : : + doc2.addField("id", "" + i);
>> : : + ur.add(doc2);
>> : : + }
>> : :
>> : : - ur.commit(client, COLLECTION_NAME);
>> : : - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
>> : : - assertEquals(10, rsp.getResults().getNumFound());
>> : : + ur.commit(client, COLLECTION_NAME);
>> : : + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
>> : : + assertEquals(10, rsp.getResults().getNumFound());
>> : :
>> : : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
>> : : + JettySolrRunner coordinatorJetty = null;
>> : : + try {
>> : : + coordinatorJetty = cluster.startJettySolrRunner();
>> : : + } finally {
>> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
>> : : + }
>> : : + QueryResponse rslt =
>> : : + new QueryRequest(new SolrQuery("*:*"))
>> : : + .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
>> : : + .process(client, COLLECTION_NAME);
>> : : +
>> : : + assertEquals(10, rslt.getResults().size());
>> : : +
>> : : + DocCollection collection =
>> : : + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
>> : : + assertNotNull(collection);
>> : : +
>> : : + Set<String> expectedNodes = new HashSet<>();
>> : : + expectedNodes.add(coordinatorJetty.getNodeName());
>> : : + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
>> : : + assertTrue(expectedNodes.isEmpty());
>> : : + } finally {
>> : : + cluster.shutdown();
>> : : + }
>> : : + }
>> : : +
>> : : + public void testNRTRestart() throws Exception {
>> : : + // we restart jetty and expect to find on disk data - need a local fs directory
>> : : + useFactory(null);
>> : : + String COLL = "coordinator_test_coll";
>> : : + MiniSolrCloudCluster cluster =
>> : : + configureCluster(3)
>> : : + .withJettyConfig(jetty -> jetty.enableV2(true))
>> : : + .addConfig("conf", configset("conf2"))
>> : : + .configure();
>> : : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
>> : : - JettySolrRunner coordinatorJetty = null;
>> : : + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
>> : : + String qaJettyBase = qaJetty.getBaseUrl().toString();
>> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
>> : : + ExecutorService executor =
>> : : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
>> : : try {
>> : : - coordinatorJetty = cluster.startJettySolrRunner();
>> : : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
>> : : + .process(cluster.getSolrClient());
>> : : + cluster.waitForActiveCollection(COLL, 1, 2);
>> : : + DocCollection docColl =
>> : : + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
>> : : + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
>> : : + assertNotNull(nrtReplica);
>> : : + String nrtCore = nrtReplica.getCoreName();
>> : : + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
>> : : + assertNotNull(pullReplica);
>> : : + String pullCore = pullReplica.getCoreName();
>> : : +
>> : : + SolrInputDocument sid = new SolrInputDocument();
>> : : + sid.addField("id", "123");
>> : : + sid.addField("desc_s", "A Document");
>> : : + JettySolrRunner nrtJetty = null;
>> : : + JettySolrRunner pullJetty = null;
>> : : + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
>> : : + String nodeName = j.getNodeName();
>> : : + if (nodeName.equals(nrtReplica.getNodeName())) {
>> : : + nrtJetty = j;
>> : : + } else if (nodeName.equals(pullReplica.getNodeName())) {
>> : : + pullJetty = j;
>> : : + }
>> : : + }
>> : : + assertNotNull(nrtJetty);
>> : : + assertNotNull(pullJetty);
>> : : + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
>> : : + client.add(COLL, sid);
>> : : + client.commit(COLL);
>> : : + assertEquals(
>> : : + nrtCore,
>> : : + getHostCoreName(
>> : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
>> : : + assertEquals(
>> : : + pullCore,
>> : : + getHostCoreName(
>> : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
>> : : + // Now , kill NRT jetty
>> : : + JettySolrRunner nrtJettyF = nrtJetty;
>> : : + JettySolrRunner pullJettyF = pullJetty;
>> : : + Random r = random();
>> : : + final long establishBaselineMs = r.nextInt(1000);
>> : : + final long nrtDowntimeMs = r.nextInt(10000);
>> : : + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
>> : : + // indexing code,
>> : : + // based on the fact that our indexing is based on a PULL-node client.
>> : : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
>> : : + Future<?> jettyManipulationFuture =
>> : : + executor.submit(
>> : : + () -> {
>> : : + // we manipulate the jetty instances in a separate thread to more closely mimic
>> : : + // the behavior we'd
>> : : + // see irl.
>> : : + try {
>> : : + Thread.sleep(establishBaselineMs);
>> : : + log.info("stopping NRT jetty ...");
>> : : + nrtJettyF.stop();
>> : : + log.info("NRT jetty stopped.");
>> : : + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
>> : : + log.info("restarting NRT jetty ...");
>> : : + nrtJettyF.start(true);
>> : : + log.info("NRT jetty restarted.");
>> : : + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
>> : : + // state
>> : : + // used for query request routing has expired (60s). But here we force a return
>> : : + // to NRT
>> : : + // by stopping the PULL replica after a brief delay ...
>> : : + Thread.sleep(pullServiceTimeMs);
>> : : + log.info("stopping PULL jetty ...");
>> : : + pullJettyF.stop();
>> : : + log.info("PULL jetty stopped.");
>> : : + } catch (Exception e) {
>> : : + throw new RuntimeException(e);
>> : : + }
>> : : + });
>> : : + String hostCore;
>> : : + long start = new Date().getTime();
>> : : + long individualRequestStart = start;
>> : : + int count = 0;
>> : : + while (nrtCore.equals(
>> : : + hostCore =
>> : : + getHostCoreName(
>> : : + COLL,
>> : : + qaJettyBase,
>> : : + client,
>> : : + p -> p.add("shards.preference", "replica.type:NRT")))) {
>> : : + count++;
>> : : + individualRequestStart = new Date().getTime();
>> : : + }
>> : : + long now = new Date().getTime();
>> : : + log.info(
>> : : + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
>> : : + count,
>> : : + now - start,
>> : : + establishBaselineMs,
>> : : + now - individualRequestStart);
>> : : + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
>> : : + // very fast,
>> : : + // because our QA-based client already knows both replicas are active, the index is stable,
>> : : + // so the moment
>> : : + // the client finds NRT is down it should be able to failover immediately and transparently
>> : : + // to PULL.
>> : : + assertEquals(
>> : : + "when we break out of the NRT query loop, should be b/c routed to PULL",
>> : : + pullCore,
>> : : + hostCore);
>> : : + SolrInputDocument d = new SolrInputDocument();
>> : : + d.addField("id", "345");
>> : : + d.addField("desc_s", "Another Document");
>> : : + // attempts to add another doc while NRT is down should fail, then eventually succeed when
>> : : + // NRT comes back up
>> : : + count = 0;
>> : : + start = new Date().getTime();
>> : : + individualRequestStart = start;
>> : : + for (; ; ) {
>> : : + try {
>> : : + client.add(COLL, d);
>> : : + client.commit(COLL);
>> : : + break;
>> : : + } catch (SolrException ex) {
>> : : + // we expect these until nrtJetty is back up.
>> : : + count++;
>> : : + Thread.sleep(100);
>> : : + }
>> : : + individualRequestStart = new Date().getTime();
>> : : + }
>> : : + now = new Date().getTime();
>> : : + log.info(
>> : : + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
>> : : + now - individualRequestStart,
>> : : + now - start,
>> : : + nrtDowntimeMs,
>> : : + count);
>> : : + // NRT replica is back up, registered as available with Zk, and availability info has been
>> : : + // pulled down by
>> : : + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
>> : : + // completed. All of this
>> : : + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
>> : : + // it causes failures
>> : : + // regularly we should feel free to increase the tolerance; but it's meant to provide a
>> : : + // stable baseline from
>> : : + // which to detect regressions.
>> : : + count = 0;
>> : : + start = new Date().getTime();
>> : : + individualRequestStart = start;
>> : : + while (pullCore.equals(
>> : : + hostCore =
>> : : + getHostCoreName(
>> : : + COLL,
>> : : + qaJettyBase,
>> : : + client,
>> : : + p -> {
>> : : + p.set(CommonParams.Q, "id:345");
>> : : + p.add("shards.preference", "replica.type:NRT");
>> : : + }))) {
>> : : + count++;
>> : : + Thread.sleep(100);
>> : : + individualRequestStart = new Date().getTime();
>> : : + }
>> : : + now = new Date().getTime();
>> : : + log.info(
>> : : + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
>> : : + count,
>> : : + now - start,
>> : : + pullServiceTimeMs,
>> : : + now - individualRequestStart);
>> : : + assertEquals(nrtCore, hostCore);
>> : : + // allow any exceptions to propagate
>> : : + jettyManipulationFuture.get();
>> : : + if (true) return;
>> : : +
>> : : + // next phase: just toggle a bunch
>> : : + // TODO: could separate this out into a different test method, but this should suffice for
>> : : + // now
>> : : + pullJetty.start(true);
>> : : + AtomicBoolean done = new AtomicBoolean();
>> : : + long runMinutes = 1;
>> : : + long finishTimeMs =
>> : : + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
>> : : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
>> : : + Random threadRandom = new Random(r.nextInt());
>> : : + Future<Integer> f =
>> : : + executor.submit(
>> : : + () -> {
>> : : + int iteration = 0;
>> : : + while (new Date().getTime() < finishTimeMs && !done.get()) {
>> : : + int idx = iteration++ % jettys.length;
>> : : + JettySolrRunner toManipulate = jettys[idx];
>> : : + try {
>> : : + int serveTogetherTime = threadRandom.nextInt(7000);
>> : : + int downTime = threadRandom.nextInt(7000);
>> : : + log.info("serving together for {}ms", serveTogetherTime);
>> : : + Thread.sleep(serveTogetherTime);
>> : : + log.info("stopping {} ...", idx);
>> : : + toManipulate.stop();
>> : : + log.info("stopped {}.", idx);
>> : : + Thread.sleep(downTime);
>> : : + log.info("restarting {} ...", idx);
>> : : + toManipulate.start(true);
>> : : + log.info("restarted {}.", idx);
>> : : + } catch (Exception e) {
>> : : + throw new RuntimeException(e);
>> : : + }
>> : : + }
>> : : + done.set(true);
>> : : + return iteration;
>> : : + });
>> : : + count = 0;
>> : : + start = new Date().getTime();
>> : : + try {
>> : : + do {
>> : : + pullCore.equals(
>> : : + hostCore =
>> : : + getHostCoreName(
>> : : + COLL,
>> : : + qaJettyBase,
>> : : + client,
>> : : + p -> {
>> : : + p.set(CommonParams.Q, "id:345");
>> : : + p.add("shards.preference", "replica.type:NRT");
>> : : + }));
>> : : + count++;
>> : : + Thread.sleep(100);
>> : : + } while (!done.get());
>> : : + } finally {
>> : : + final String result;
>> : : + if (done.getAndSet(true)) {
>> : : + result = "Success";
>> : : + } else {
>> : : + // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
>> : : + // block)
>> : : + result = "Failure";
>> : : + }
>> : : + Integer toggleCount = f.get();
>> : : + long secondsDuration =
>> : : + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
>> : : + log.info(
>> : : + "{}! {} seconds, {} toggles, {} requests served",
>> : : + result,
>> : : + secondsDuration,
>> : : + toggleCount,
>> : : + count);
>> : : + }
>> : : + }
>> : : } finally {
>> : : - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
>> : : + try {
>> : : + ExecutorUtil.shutdownAndAwaitTermination(executor);
>> : : + } finally {
>> : : + cluster.shutdown();
>> : : + }
>> : : }
>> : : - QueryResponse rslt =
>> : : - new QueryRequest(new SolrQuery("*:*"))
>> : : - .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
>> : : - .process(client, COLLECTION_NAME);
>> : : -
>> : : - assertEquals(10, rslt.getResults().size());
>> : : + }
>> : :
>> : : - DocCollection collection =
>> : : - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
>> : : - assertNotNull(collection);
>> : : + @SuppressWarnings("rawtypes")
>> : : + private String getHostCoreName(
>> : : + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
>> : : + throws Exception {
>> : :
>> : : - Set<String> expectedNodes = new HashSet<>();
>> : : - expectedNodes.add(coordinatorJetty.getNodeName());
>> : : - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
>> : : - assertTrue(expectedNodes.isEmpty());
>> : : + boolean found = false;
>> : : + SolrQuery q = new SolrQuery("*:*");
>> : : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
>> : : + p.accept(q);
>> : : + StringBuilder sb =
>> : : + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
>> : : + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
>> : : + SolrDocumentList docs = null;
>> : : + for (int i = 0; i < 100; i++) {
>> : : + try {
>> : : + SimpleOrderedMap rsp =
>> : : + (SimpleOrderedMap)
>> : : + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
>> : : + docs = (SolrDocumentList) rsp.get("response");
>> : : + if (docs.size() > 0) {
>> : : + found = true;
>> : : + break;
>> : : + }
>> : : + } catch (SolrException ex) {
>> : : + // we know we're doing tricky things that might cause transient errors
>> : : + // TODO: all these query requests go to the QA node -- should QA propagate internal request
>> : : + // errors
>> : : + // to the external client (and the external client retry?) or should QA attempt to failover
>> : : + // transparently
>> : : + // in the event of an error?
>> : : + if (i < 5) {
>> : : + log.info("swallowing transient error", ex);
>> : : + } else {
>> : : + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
>> : : + fail("initial error time threshold exceeded");
>> : : + }
>> : : + }
>> : : + Thread.sleep(100);
>> : : + }
>> : : + assertTrue(found);
>> : : + return (String) docs.get(0).getFieldValue("_core_");
>> : : }
>> : : }
>> : :
>> : :
>> :
>> : -Hoss
>> : http://www.lucidworks.com/
>> :
>>
>> -Hoss
>> http://www.lucidworks.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
>> For additional commands, e-mail: dev-help@solr.apache.org
>>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
For additional commands, e-mail: dev-help@solr.apache.org
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Ishan Chattopadhyaya <ic...@gmail.com>.
I'll take a look, Hoss.
On Sat, 21 Jan, 2023, 2:37 am Chris Hostetter, <ho...@fucit.org>
wrote:
>
> Created jia & AwaitsFix'ed the test ...
>
> https://issues.apache.org/jira/browse/SOLR-16630
>
>
>
> : Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
> : From: Chris Hostetter <ho...@fucit.org>
> : To: dev@solr.apache.org
> : Cc: "commits@solr.apache.org" <co...@solr.apache.org>
> : Subject: Re: [solr] branch branch_9x updated: test case added for
> coordinator
> : role
> :
> :
> : Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a
> : ridiculous number of times since you added it a week ago.
> :
> : IIUC this test has *NEVER* passed on a jenkins 9x build (only on the
> main
> : builds)
> :
> : -Hoss
> :
> :
> : : Date: Thu, 12 Jan 2023 07:54:33 +0000
> : : From: noble@apache.org
> : : Reply-To: dev@solr.apache.org
> : : To: "commits@solr.apache.org" <co...@solr.apache.org>
> : : Subject: [solr] branch branch_9x updated: test case added for
> coordinator role
> : :
> : : This is an automated email from the ASF dual-hosted git repository.
> : :
> : : noble pushed a commit to branch branch_9x
> : : in repository https://gitbox.apache.org/repos/asf/solr.git
> : :
> : :
> : : The following commit(s) were added to refs/heads/branch_9x by this
> push:
> : : new ec9b152c31f test case added for coordinator role
> : : ec9b152c31f is described below
> : :
> : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
> : : Author: Noble Paul <no...@gmail.com>
> : : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
> : :
> : : test case added for coordinator role
> : : ---
> : : .../apache/solr/search/TestCoordinatorRole.java | 412
> +++++++++++++++++++--
> : : 1 file changed, 375 insertions(+), 37 deletions(-)
> : :
> : : diff --git
> a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : : index 5e2dcfb70a8..6c4e845cf5a 100644
> : : ---
> a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : : +++
> b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
> : : @@ -17,69 +17,407 @@
> : :
> : : package org.apache.solr.search;
> : :
> : : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
> : : +import static org.apache.solr.common.params.CommonParams.TRUE;
> : : +
> : : +import java.lang.invoke.MethodHandles;
> : : +import java.util.Date;
> : : +import java.util.EnumSet;
> : : import java.util.HashSet;
> : : import java.util.List;
> : : +import java.util.Random;
> : : import java.util.Set;
> : : +import java.util.concurrent.ExecutorService;
> : : +import java.util.concurrent.Future;
> : : +import java.util.concurrent.TimeUnit;
> : : +import java.util.concurrent.atomic.AtomicBoolean;
> : : +import java.util.function.Consumer;
> : : import org.apache.solr.client.solrj.SolrQuery;
> : : import org.apache.solr.client.solrj.impl.CloudSolrClient;
> : : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
> : : import org.apache.solr.client.solrj.request.CollectionAdminRequest;
> : : import org.apache.solr.client.solrj.request.QueryRequest;
> : : import org.apache.solr.client.solrj.request.UpdateRequest;
> : : import org.apache.solr.client.solrj.response.QueryResponse;
> : : +import org.apache.solr.cloud.MiniSolrCloudCluster;
> : : import org.apache.solr.cloud.SolrCloudTestCase;
> : : +import org.apache.solr.common.SolrDocumentList;
> : : +import org.apache.solr.common.SolrException;
> : : import org.apache.solr.common.SolrInputDocument;
> : : import org.apache.solr.common.cloud.DocCollection;
> : : +import org.apache.solr.common.cloud.Replica;
> : : +import org.apache.solr.common.params.CommonParams;
> : : +import org.apache.solr.common.util.ExecutorUtil;
> : : +import org.apache.solr.common.util.SimpleOrderedMap;
> : : +import org.apache.solr.common.util.SolrNamedThreadFactory;
> : : +import org.apache.solr.common.util.Utils;
> : : import org.apache.solr.core.NodeRoles;
> : : import org.apache.solr.embedded.JettySolrRunner;
> : : import org.apache.solr.servlet.CoordinatorHttpSolrCall;
> : : -import org.junit.BeforeClass;
> : : +import org.slf4j.Logger;
> : : +import org.slf4j.LoggerFactory;
> : :
> : : public class TestCoordinatorRole extends SolrCloudTestCase {
> : : -
> : : - @BeforeClass
> : : - public static void setupCluster() throws Exception {
> : : - configureCluster(4).addConfig("conf",
> configset("cloud-minimal")).configure();
> : : - }
> : : + private static final Logger log =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> : :
> : : public void testSimple() throws Exception {
> : : - CloudSolrClient client = cluster.getSolrClient();
> : : - String COLLECTION_NAME = "test_coll";
> : : - String SYNTHETIC_COLLECTION =
> CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
> : : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf",
> 2, 2)
> : : - .process(cluster.getSolrClient());
> : : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
> : : - UpdateRequest ur = new UpdateRequest();
> : : - for (int i = 0; i < 10; i++) {
> : : - SolrInputDocument doc2 = new SolrInputDocument();
> : : - doc2.addField("id", "" + i);
> : : - ur.add(doc2);
> : : - }
> : : + MiniSolrCloudCluster cluster =
> : : + configureCluster(4).addConfig("conf",
> configset("cloud-minimal")).configure();
> : : + try {
> : : + CloudSolrClient client = cluster.getSolrClient();
> : : + String COLLECTION_NAME = "test_coll";
> : : + String SYNTHETIC_COLLECTION =
> CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
> : : + CollectionAdminRequest.createCollection(COLLECTION_NAME,
> "conf", 2, 2)
> : : + .process(cluster.getSolrClient());
> : : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
> : : + UpdateRequest ur = new UpdateRequest();
> : : + for (int i = 0; i < 10; i++) {
> : : + SolrInputDocument doc2 = new SolrInputDocument();
> : : + doc2.addField("id", "" + i);
> : : + ur.add(doc2);
> : : + }
> : :
> : : - ur.commit(client, COLLECTION_NAME);
> : : - QueryResponse rsp = client.query(COLLECTION_NAME, new
> SolrQuery("*:*"));
> : : - assertEquals(10, rsp.getResults().getNumFound());
> : : + ur.commit(client, COLLECTION_NAME);
> : : + QueryResponse rsp = client.query(COLLECTION_NAME, new
> SolrQuery("*:*"));
> : : + assertEquals(10, rsp.getResults().getNumFound());
> : :
> : : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
> : : + JettySolrRunner coordinatorJetty = null;
> : : + try {
> : : + coordinatorJetty = cluster.startJettySolrRunner();
> : : + } finally {
> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : : + }
> : : + QueryResponse rslt =
> : : + new QueryRequest(new SolrQuery("*:*"))
> : : +
> .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
> : : + .process(client, COLLECTION_NAME);
> : : +
> : : + assertEquals(10, rslt.getResults().size());
> : : +
> : : + DocCollection collection =
> : : +
> cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
> : : + assertNotNull(collection);
> : : +
> : : + Set<String> expectedNodes = new HashSet<>();
> : : + expectedNodes.add(coordinatorJetty.getNodeName());
> : : + collection.forEachReplica((s, replica) ->
> expectedNodes.remove(replica.getNodeName()));
> : : + assertTrue(expectedNodes.isEmpty());
> : : + } finally {
> : : + cluster.shutdown();
> : : + }
> : : + }
> : : +
> : : + public void testNRTRestart() throws Exception {
> : : + // we restart jetty and expect to find on disk data - need a
> local fs directory
> : : + useFactory(null);
> : : + String COLL = "coordinator_test_coll";
> : : + MiniSolrCloudCluster cluster =
> : : + configureCluster(3)
> : : + .withJettyConfig(jetty -> jetty.enableV2(true))
> : : + .addConfig("conf", configset("conf2"))
> : : + .configure();
> : : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
> : : - JettySolrRunner coordinatorJetty = null;
> : : + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
> : : + String qaJettyBase = qaJetty.getBaseUrl().toString();
> : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : : + ExecutorService executor =
> : : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new
> SolrNamedThreadFactory("manipulateJetty"));
> : : try {
> : : - coordinatorJetty = cluster.startJettySolrRunner();
> : : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0,
> 1)
> : : + .process(cluster.getSolrClient());
> : : + cluster.waitForActiveCollection(COLL, 1, 2);
> : : + DocCollection docColl =
> : : +
> cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
> : : + Replica nrtReplica =
> docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
> : : + assertNotNull(nrtReplica);
> : : + String nrtCore = nrtReplica.getCoreName();
> : : + Replica pullReplica =
> docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
> : : + assertNotNull(pullReplica);
> : : + String pullCore = pullReplica.getCoreName();
> : : +
> : : + SolrInputDocument sid = new SolrInputDocument();
> : : + sid.addField("id", "123");
> : : + sid.addField("desc_s", "A Document");
> : : + JettySolrRunner nrtJetty = null;
> : : + JettySolrRunner pullJetty = null;
> : : + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
> : : + String nodeName = j.getNodeName();
> : : + if (nodeName.equals(nrtReplica.getNodeName())) {
> : : + nrtJetty = j;
> : : + } else if (nodeName.equals(pullReplica.getNodeName())) {
> : : + pullJetty = j;
> : : + }
> : : + }
> : : + assertNotNull(nrtJetty);
> : : + assertNotNull(pullJetty);
> : : + try (HttpSolrClient client = (HttpSolrClient)
> pullJetty.newClient()) {
> : : + client.add(COLL, sid);
> : : + client.commit(COLL);
> : : + assertEquals(
> : : + nrtCore,
> : : + getHostCoreName(
> : : + COLL, qaJettyBase, client, p ->
> p.add("shards.preference", "replica.type:NRT")));
> : : + assertEquals(
> : : + pullCore,
> : : + getHostCoreName(
> : : + COLL, qaJettyBase, client, p ->
> p.add("shards.preference", "replica.type:PULL")));
> : : + // Now , kill NRT jetty
> : : + JettySolrRunner nrtJettyF = nrtJetty;
> : : + JettySolrRunner pullJettyF = pullJetty;
> : : + Random r = random();
> : : + final long establishBaselineMs = r.nextInt(1000);
> : : + final long nrtDowntimeMs = r.nextInt(10000);
> : : + // NOTE: for `pullServiceTimeMs`, it can't be super-short.
> This is just to simplify our
> : : + // indexing code,
> : : + // based on the fact that our indexing is based on a
> PULL-node client.
> : : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
> : : + Future<?> jettyManipulationFuture =
> : : + executor.submit(
> : : + () -> {
> : : + // we manipulate the jetty instances in a separate
> thread to more closely mimic
> : : + // the behavior we'd
> : : + // see irl.
> : : + try {
> : : + Thread.sleep(establishBaselineMs);
> : : + log.info("stopping NRT jetty ...");
> : : + nrtJettyF.stop();
> : : + log.info("NRT jetty stopped.");
> : : + Thread.sleep(nrtDowntimeMs); // let NRT be down
> for a while
> : : + log.info("restarting NRT jetty ...");
> : : + nrtJettyF.start(true);
> : : + log.info("NRT jetty restarted.");
> : : + // once NRT is back up, we expect PULL to
> continue serving until the TTL on ZK
> : : + // state
> : : + // used for query request routing has expired
> (60s). But here we force a return
> : : + // to NRT
> : : + // by stopping the PULL replica after a brief
> delay ...
> : : + Thread.sleep(pullServiceTimeMs);
> : : + log.info("stopping PULL jetty ...");
> : : + pullJettyF.stop();
> : : + log.info("PULL jetty stopped.");
> : : + } catch (Exception e) {
> : : + throw new RuntimeException(e);
> : : + }
> : : + });
> : : + String hostCore;
> : : + long start = new Date().getTime();
> : : + long individualRequestStart = start;
> : : + int count = 0;
> : : + while (nrtCore.equals(
> : : + hostCore =
> : : + getHostCoreName(
> : : + COLL,
> : : + qaJettyBase,
> : : + client,
> : : + p -> p.add("shards.preference",
> "replica.type:NRT")))) {
> : : + count++;
> : : + individualRequestStart = new Date().getTime();
> : : + }
> : : + long now = new Date().getTime();
> : : + log.info(
> : : + "phase1 NRT queries count={}, overall_duration={},
> baseline_expected_overall_duration={}, switch-to-pull_duration={}",
> : : + count,
> : : + now - start,
> : : + establishBaselineMs,
> : : + now - individualRequestStart);
> : : + // default tolerance of 500ms below should suffice. Failover
> to PULL for this case should be
> : : + // very fast,
> : : + // because our QA-based client already knows both replicas
> are active, the index is stable,
> : : + // so the moment
> : : + // the client finds NRT is down it should be able to failover
> immediately and transparently
> : : + // to PULL.
> : : + assertEquals(
> : : + "when we break out of the NRT query loop, should be b/c
> routed to PULL",
> : : + pullCore,
> : : + hostCore);
> : : + SolrInputDocument d = new SolrInputDocument();
> : : + d.addField("id", "345");
> : : + d.addField("desc_s", "Another Document");
> : : + // attempts to add another doc while NRT is down should fail,
> then eventually succeed when
> : : + // NRT comes back up
> : : + count = 0;
> : : + start = new Date().getTime();
> : : + individualRequestStart = start;
> : : + for (; ; ) {
> : : + try {
> : : + client.add(COLL, d);
> : : + client.commit(COLL);
> : : + break;
> : : + } catch (SolrException ex) {
> : : + // we expect these until nrtJetty is back up.
> : : + count++;
> : : + Thread.sleep(100);
> : : + }
> : : + individualRequestStart = new Date().getTime();
> : : + }
> : : + now = new Date().getTime();
> : : + log.info(
> : : + "successfully added another doc; duration: {},
> overall_duration={}, baseline_expected_overall_duration={},
> exception_count={}",
> : : + now - individualRequestStart,
> : : + now - start,
> : : + nrtDowntimeMs,
> : : + count);
> : : + // NRT replica is back up, registered as available with Zk,
> and availability info has been
> : : + // pulled down by
> : : + // our PULL-replica-based `client`, forwarded indexing
> command to NRT, index/commit
> : : + // completed. All of this
> : : + // accounts for the 3000ms tolerance allowed for below. This
> is not a strict value, and if
> : : + // it causes failures
> : : + // regularly we should feel free to increase the tolerance;
> but it's meant to provide a
> : : + // stable baseline from
> : : + // which to detect regressions.
> : : + count = 0;
> : : + start = new Date().getTime();
> : : + individualRequestStart = start;
> : : + while (pullCore.equals(
> : : + hostCore =
> : : + getHostCoreName(
> : : + COLL,
> : : + qaJettyBase,
> : : + client,
> : : + p -> {
> : : + p.set(CommonParams.Q, "id:345");
> : : + p.add("shards.preference", "replica.type:NRT");
> : : + }))) {
> : : + count++;
> : : + Thread.sleep(100);
> : : + individualRequestStart = new Date().getTime();
> : : + }
> : : + now = new Date().getTime();
> : : + log.info(
> : : + "query retries between NRT index-ready and query-ready:
> {}; overall_duration={}; baseline_expected_overall_duration={};
> failover-request_duration={}",
> : : + count,
> : : + now - start,
> : : + pullServiceTimeMs,
> : : + now - individualRequestStart);
> : : + assertEquals(nrtCore, hostCore);
> : : + // allow any exceptions to propagate
> : : + jettyManipulationFuture.get();
> : : + if (true) return;
> : : +
> : : + // next phase: just toggle a bunch
> : : + // TODO: could separate this out into a different test
> method, but this should suffice for
> : : + // now
> : : + pullJetty.start(true);
> : : + AtomicBoolean done = new AtomicBoolean();
> : : + long runMinutes = 1;
> : : + long finishTimeMs =
> : : + new Date().getTime() +
> TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
> : : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF,
> pullJettyF};
> : : + Random threadRandom = new Random(r.nextInt());
> : : + Future<Integer> f =
> : : + executor.submit(
> : : + () -> {
> : : + int iteration = 0;
> : : + while (new Date().getTime() < finishTimeMs &&
> !done.get()) {
> : : + int idx = iteration++ % jettys.length;
> : : + JettySolrRunner toManipulate = jettys[idx];
> : : + try {
> : : + int serveTogetherTime =
> threadRandom.nextInt(7000);
> : : + int downTime = threadRandom.nextInt(7000);
> : : + log.info("serving together for {}ms",
> serveTogetherTime);
> : : + Thread.sleep(serveTogetherTime);
> : : + log.info("stopping {} ...", idx);
> : : + toManipulate.stop();
> : : + log.info("stopped {}.", idx);
> : : + Thread.sleep(downTime);
> : : + log.info("restarting {} ...", idx);
> : : + toManipulate.start(true);
> : : + log.info("restarted {}.", idx);
> : : + } catch (Exception e) {
> : : + throw new RuntimeException(e);
> : : + }
> : : + }
> : : + done.set(true);
> : : + return iteration;
> : : + });
> : : + count = 0;
> : : + start = new Date().getTime();
> : : + try {
> : : + do {
> : : + pullCore.equals(
> : : + hostCore =
> : : + getHostCoreName(
> : : + COLL,
> : : + qaJettyBase,
> : : + client,
> : : + p -> {
> : : + p.set(CommonParams.Q, "id:345");
> : : + p.add("shards.preference",
> "replica.type:NRT");
> : : + }));
> : : + count++;
> : : + Thread.sleep(100);
> : : + } while (!done.get());
> : : + } finally {
> : : + final String result;
> : : + if (done.getAndSet(true)) {
> : : + result = "Success";
> : : + } else {
> : : + // not yet set to done, completed abnormally (exception
> will be thrown beyond `finally`
> : : + // block)
> : : + result = "Failure";
> : : + }
> : : + Integer toggleCount = f.get();
> : : + long secondsDuration =
> : : + TimeUnit.SECONDS.convert(new Date().getTime() - start,
> TimeUnit.MILLISECONDS);
> : : + log.info(
> : : + "{}! {} seconds, {} toggles, {} requests served",
> : : + result,
> : : + secondsDuration,
> : : + toggleCount,
> : : + count);
> : : + }
> : : + }
> : : } finally {
> : : - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
> : : + try {
> : : + ExecutorUtil.shutdownAndAwaitTermination(executor);
> : : + } finally {
> : : + cluster.shutdown();
> : : + }
> : : }
> : : - QueryResponse rslt =
> : : - new QueryRequest(new SolrQuery("*:*"))
> : : -
> .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
> : : - .process(client, COLLECTION_NAME);
> : : -
> : : - assertEquals(10, rslt.getResults().size());
> : : + }
> : :
> : : - DocCollection collection =
> : : -
> cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
> : : - assertNotNull(collection);
> : : + @SuppressWarnings("rawtypes")
> : : + private String getHostCoreName(
> : : + String COLL, String qaNode, HttpSolrClient solrClient,
> Consumer<SolrQuery> p)
> : : + throws Exception {
> : :
> : : - Set<String> expectedNodes = new HashSet<>();
> : : - expectedNodes.add(coordinatorJetty.getNodeName());
> : : - collection.forEachReplica((s, replica) ->
> expectedNodes.remove(replica.getNodeName()));
> : : - assertTrue(expectedNodes.isEmpty());
> : : + boolean found = false;
> : : + SolrQuery q = new SolrQuery("*:*");
> : : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
> : : + p.accept(q);
> : : + StringBuilder sb =
> : : + new
> StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
> : : + q.forEach(e ->
> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
> : : + SolrDocumentList docs = null;
> : : + for (int i = 0; i < 100; i++) {
> : : + try {
> : : + SimpleOrderedMap rsp =
> : : + (SimpleOrderedMap)
> : : + Utils.executeGET(solrClient.getHttpClient(),
> sb.toString(), Utils.JAVABINCONSUMER);
> : : + docs = (SolrDocumentList) rsp.get("response");
> : : + if (docs.size() > 0) {
> : : + found = true;
> : : + break;
> : : + }
> : : + } catch (SolrException ex) {
> : : + // we know we're doing tricky things that might cause
> transient errors
> : : + // TODO: all these query requests go to the QA node -- should
> QA propagate internal request
> : : + // errors
> : : + // to the external client (and the external client retry?)
> or should QA attempt to failover
> : : + // transparently
> : : + // in the event of an error?
> : : + if (i < 5) {
> : : + log.info("swallowing transient error", ex);
> : : + } else {
> : : + log.error("only expect actual _errors_ within a small
> window (e.g. 500ms)", ex);
> : : + fail("initial error time threshold exceeded");
> : : + }
> : : + }
> : : + Thread.sleep(100);
> : : + }
> : : + assertTrue(found);
> : : + return (String) docs.get(0).getFieldValue("_core_");
> : : }
> : : }
> : :
> : :
> :
> : -Hoss
> : http://www.lucidworks.com/
> :
>
> -Hoss
> http://www.lucidworks.com/
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
> For additional commands, e-mail: dev-help@solr.apache.org
>
>
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Chris Hostetter <ho...@fucit.org>.
Created jia & AwaitsFix'ed the test ...
https://issues.apache.org/jira/browse/SOLR-16630
: Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
: From: Chris Hostetter <ho...@fucit.org>
: To: dev@solr.apache.org
: Cc: "commits@solr.apache.org" <co...@solr.apache.org>
: Subject: Re: [solr] branch branch_9x updated: test case added for coordinator
: role
:
:
: Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a
: ridiculous number of times since you added it a week ago.
:
: IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main
: builds)
:
: -Hoss
:
:
: : Date: Thu, 12 Jan 2023 07:54:33 +0000
: : From: noble@apache.org
: : Reply-To: dev@solr.apache.org
: : To: "commits@solr.apache.org" <co...@solr.apache.org>
: : Subject: [solr] branch branch_9x updated: test case added for coordinator role
: :
: : This is an automated email from the ASF dual-hosted git repository.
: :
: : noble pushed a commit to branch branch_9x
: : in repository https://gitbox.apache.org/repos/asf/solr.git
: :
: :
: : The following commit(s) were added to refs/heads/branch_9x by this push:
: : new ec9b152c31f test case added for coordinator role
: : ec9b152c31f is described below
: :
: : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
: : Author: Noble Paul <no...@gmail.com>
: : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
: :
: : test case added for coordinator role
: : ---
: : .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++--
: : 1 file changed, 375 insertions(+), 37 deletions(-)
: :
: : diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : index 5e2dcfb70a8..6c4e845cf5a 100644
: : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : @@ -17,69 +17,407 @@
: :
: : package org.apache.solr.search;
: :
: : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
: : +import static org.apache.solr.common.params.CommonParams.TRUE;
: : +
: : +import java.lang.invoke.MethodHandles;
: : +import java.util.Date;
: : +import java.util.EnumSet;
: : import java.util.HashSet;
: : import java.util.List;
: : +import java.util.Random;
: : import java.util.Set;
: : +import java.util.concurrent.ExecutorService;
: : +import java.util.concurrent.Future;
: : +import java.util.concurrent.TimeUnit;
: : +import java.util.concurrent.atomic.AtomicBoolean;
: : +import java.util.function.Consumer;
: : import org.apache.solr.client.solrj.SolrQuery;
: : import org.apache.solr.client.solrj.impl.CloudSolrClient;
: : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
: : import org.apache.solr.client.solrj.request.CollectionAdminRequest;
: : import org.apache.solr.client.solrj.request.QueryRequest;
: : import org.apache.solr.client.solrj.request.UpdateRequest;
: : import org.apache.solr.client.solrj.response.QueryResponse;
: : +import org.apache.solr.cloud.MiniSolrCloudCluster;
: : import org.apache.solr.cloud.SolrCloudTestCase;
: : +import org.apache.solr.common.SolrDocumentList;
: : +import org.apache.solr.common.SolrException;
: : import org.apache.solr.common.SolrInputDocument;
: : import org.apache.solr.common.cloud.DocCollection;
: : +import org.apache.solr.common.cloud.Replica;
: : +import org.apache.solr.common.params.CommonParams;
: : +import org.apache.solr.common.util.ExecutorUtil;
: : +import org.apache.solr.common.util.SimpleOrderedMap;
: : +import org.apache.solr.common.util.SolrNamedThreadFactory;
: : +import org.apache.solr.common.util.Utils;
: : import org.apache.solr.core.NodeRoles;
: : import org.apache.solr.embedded.JettySolrRunner;
: : import org.apache.solr.servlet.CoordinatorHttpSolrCall;
: : -import org.junit.BeforeClass;
: : +import org.slf4j.Logger;
: : +import org.slf4j.LoggerFactory;
: :
: : public class TestCoordinatorRole extends SolrCloudTestCase {
: : -
: : - @BeforeClass
: : - public static void setupCluster() throws Exception {
: : - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: : - }
: : + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
: :
: : public void testSimple() throws Exception {
: : - CloudSolrClient client = cluster.getSolrClient();
: : - String COLLECTION_NAME = "test_coll";
: : - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: : - .process(cluster.getSolrClient());
: : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: : - UpdateRequest ur = new UpdateRequest();
: : - for (int i = 0; i < 10; i++) {
: : - SolrInputDocument doc2 = new SolrInputDocument();
: : - doc2.addField("id", "" + i);
: : - ur.add(doc2);
: : - }
: : + MiniSolrCloudCluster cluster =
: : + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: : + try {
: : + CloudSolrClient client = cluster.getSolrClient();
: : + String COLLECTION_NAME = "test_coll";
: : + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: : + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: : + .process(cluster.getSolrClient());
: : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: : + UpdateRequest ur = new UpdateRequest();
: : + for (int i = 0; i < 10; i++) {
: : + SolrInputDocument doc2 = new SolrInputDocument();
: : + doc2.addField("id", "" + i);
: : + ur.add(doc2);
: : + }
: :
: : - ur.commit(client, COLLECTION_NAME);
: : - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: : - assertEquals(10, rsp.getResults().getNumFound());
: : + ur.commit(client, COLLECTION_NAME);
: : + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: : + assertEquals(10, rsp.getResults().getNumFound());
: :
: : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: : + JettySolrRunner coordinatorJetty = null;
: : + try {
: : + coordinatorJetty = cluster.startJettySolrRunner();
: : + } finally {
: : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : + }
: : + QueryResponse rslt =
: : + new QueryRequest(new SolrQuery("*:*"))
: : + .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: : + .process(client, COLLECTION_NAME);
: : +
: : + assertEquals(10, rslt.getResults().size());
: : +
: : + DocCollection collection =
: : + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: : + assertNotNull(collection);
: : +
: : + Set<String> expectedNodes = new HashSet<>();
: : + expectedNodes.add(coordinatorJetty.getNodeName());
: : + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: : + assertTrue(expectedNodes.isEmpty());
: : + } finally {
: : + cluster.shutdown();
: : + }
: : + }
: : +
: : + public void testNRTRestart() throws Exception {
: : + // we restart jetty and expect to find on disk data - need a local fs directory
: : + useFactory(null);
: : + String COLL = "coordinator_test_coll";
: : + MiniSolrCloudCluster cluster =
: : + configureCluster(3)
: : + .withJettyConfig(jetty -> jetty.enableV2(true))
: : + .addConfig("conf", configset("conf2"))
: : + .configure();
: : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: : - JettySolrRunner coordinatorJetty = null;
: : + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
: : + String qaJettyBase = qaJetty.getBaseUrl().toString();
: : + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : + ExecutorService executor =
: : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
: : try {
: : - coordinatorJetty = cluster.startJettySolrRunner();
: : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
: : + .process(cluster.getSolrClient());
: : + cluster.waitForActiveCollection(COLL, 1, 2);
: : + DocCollection docColl =
: : + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
: : + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
: : + assertNotNull(nrtReplica);
: : + String nrtCore = nrtReplica.getCoreName();
: : + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
: : + assertNotNull(pullReplica);
: : + String pullCore = pullReplica.getCoreName();
: : +
: : + SolrInputDocument sid = new SolrInputDocument();
: : + sid.addField("id", "123");
: : + sid.addField("desc_s", "A Document");
: : + JettySolrRunner nrtJetty = null;
: : + JettySolrRunner pullJetty = null;
: : + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
: : + String nodeName = j.getNodeName();
: : + if (nodeName.equals(nrtReplica.getNodeName())) {
: : + nrtJetty = j;
: : + } else if (nodeName.equals(pullReplica.getNodeName())) {
: : + pullJetty = j;
: : + }
: : + }
: : + assertNotNull(nrtJetty);
: : + assertNotNull(pullJetty);
: : + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
: : + client.add(COLL, sid);
: : + client.commit(COLL);
: : + assertEquals(
: : + nrtCore,
: : + getHostCoreName(
: : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
: : + assertEquals(
: : + pullCore,
: : + getHostCoreName(
: : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
: : + // Now , kill NRT jetty
: : + JettySolrRunner nrtJettyF = nrtJetty;
: : + JettySolrRunner pullJettyF = pullJetty;
: : + Random r = random();
: : + final long establishBaselineMs = r.nextInt(1000);
: : + final long nrtDowntimeMs = r.nextInt(10000);
: : + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
: : + // indexing code,
: : + // based on the fact that our indexing is based on a PULL-node client.
: : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
: : + Future<?> jettyManipulationFuture =
: : + executor.submit(
: : + () -> {
: : + // we manipulate the jetty instances in a separate thread to more closely mimic
: : + // the behavior we'd
: : + // see irl.
: : + try {
: : + Thread.sleep(establishBaselineMs);
: : + log.info("stopping NRT jetty ...");
: : + nrtJettyF.stop();
: : + log.info("NRT jetty stopped.");
: : + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
: : + log.info("restarting NRT jetty ...");
: : + nrtJettyF.start(true);
: : + log.info("NRT jetty restarted.");
: : + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
: : + // state
: : + // used for query request routing has expired (60s). But here we force a return
: : + // to NRT
: : + // by stopping the PULL replica after a brief delay ...
: : + Thread.sleep(pullServiceTimeMs);
: : + log.info("stopping PULL jetty ...");
: : + pullJettyF.stop();
: : + log.info("PULL jetty stopped.");
: : + } catch (Exception e) {
: : + throw new RuntimeException(e);
: : + }
: : + });
: : + String hostCore;
: : + long start = new Date().getTime();
: : + long individualRequestStart = start;
: : + int count = 0;
: : + while (nrtCore.equals(
: : + hostCore =
: : + getHostCoreName(
: : + COLL,
: : + qaJettyBase,
: : + client,
: : + p -> p.add("shards.preference", "replica.type:NRT")))) {
: : + count++;
: : + individualRequestStart = new Date().getTime();
: : + }
: : + long now = new Date().getTime();
: : + log.info(
: : + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
: : + count,
: : + now - start,
: : + establishBaselineMs,
: : + now - individualRequestStart);
: : + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
: : + // very fast,
: : + // because our QA-based client already knows both replicas are active, the index is stable,
: : + // so the moment
: : + // the client finds NRT is down it should be able to failover immediately and transparently
: : + // to PULL.
: : + assertEquals(
: : + "when we break out of the NRT query loop, should be b/c routed to PULL",
: : + pullCore,
: : + hostCore);
: : + SolrInputDocument d = new SolrInputDocument();
: : + d.addField("id", "345");
: : + d.addField("desc_s", "Another Document");
: : + // attempts to add another doc while NRT is down should fail, then eventually succeed when
: : + // NRT comes back up
: : + count = 0;
: : + start = new Date().getTime();
: : + individualRequestStart = start;
: : + for (; ; ) {
: : + try {
: : + client.add(COLL, d);
: : + client.commit(COLL);
: : + break;
: : + } catch (SolrException ex) {
: : + // we expect these until nrtJetty is back up.
: : + count++;
: : + Thread.sleep(100);
: : + }
: : + individualRequestStart = new Date().getTime();
: : + }
: : + now = new Date().getTime();
: : + log.info(
: : + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
: : + now - individualRequestStart,
: : + now - start,
: : + nrtDowntimeMs,
: : + count);
: : + // NRT replica is back up, registered as available with Zk, and availability info has been
: : + // pulled down by
: : + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
: : + // completed. All of this
: : + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
: : + // it causes failures
: : + // regularly we should feel free to increase the tolerance; but it's meant to provide a
: : + // stable baseline from
: : + // which to detect regressions.
: : + count = 0;
: : + start = new Date().getTime();
: : + individualRequestStart = start;
: : + while (pullCore.equals(
: : + hostCore =
: : + getHostCoreName(
: : + COLL,
: : + qaJettyBase,
: : + client,
: : + p -> {
: : + p.set(CommonParams.Q, "id:345");
: : + p.add("shards.preference", "replica.type:NRT");
: : + }))) {
: : + count++;
: : + Thread.sleep(100);
: : + individualRequestStart = new Date().getTime();
: : + }
: : + now = new Date().getTime();
: : + log.info(
: : + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
: : + count,
: : + now - start,
: : + pullServiceTimeMs,
: : + now - individualRequestStart);
: : + assertEquals(nrtCore, hostCore);
: : + // allow any exceptions to propagate
: : + jettyManipulationFuture.get();
: : + if (true) return;
: : +
: : + // next phase: just toggle a bunch
: : + // TODO: could separate this out into a different test method, but this should suffice for
: : + // now
: : + pullJetty.start(true);
: : + AtomicBoolean done = new AtomicBoolean();
: : + long runMinutes = 1;
: : + long finishTimeMs =
: : + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
: : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
: : + Random threadRandom = new Random(r.nextInt());
: : + Future<Integer> f =
: : + executor.submit(
: : + () -> {
: : + int iteration = 0;
: : + while (new Date().getTime() < finishTimeMs && !done.get()) {
: : + int idx = iteration++ % jettys.length;
: : + JettySolrRunner toManipulate = jettys[idx];
: : + try {
: : + int serveTogetherTime = threadRandom.nextInt(7000);
: : + int downTime = threadRandom.nextInt(7000);
: : + log.info("serving together for {}ms", serveTogetherTime);
: : + Thread.sleep(serveTogetherTime);
: : + log.info("stopping {} ...", idx);
: : + toManipulate.stop();
: : + log.info("stopped {}.", idx);
: : + Thread.sleep(downTime);
: : + log.info("restarting {} ...", idx);
: : + toManipulate.start(true);
: : + log.info("restarted {}.", idx);
: : + } catch (Exception e) {
: : + throw new RuntimeException(e);
: : + }
: : + }
: : + done.set(true);
: : + return iteration;
: : + });
: : + count = 0;
: : + start = new Date().getTime();
: : + try {
: : + do {
: : + pullCore.equals(
: : + hostCore =
: : + getHostCoreName(
: : + COLL,
: : + qaJettyBase,
: : + client,
: : + p -> {
: : + p.set(CommonParams.Q, "id:345");
: : + p.add("shards.preference", "replica.type:NRT");
: : + }));
: : + count++;
: : + Thread.sleep(100);
: : + } while (!done.get());
: : + } finally {
: : + final String result;
: : + if (done.getAndSet(true)) {
: : + result = "Success";
: : + } else {
: : + // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
: : + // block)
: : + result = "Failure";
: : + }
: : + Integer toggleCount = f.get();
: : + long secondsDuration =
: : + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
: : + log.info(
: : + "{}! {} seconds, {} toggles, {} requests served",
: : + result,
: : + secondsDuration,
: : + toggleCount,
: : + count);
: : + }
: : + }
: : } finally {
: : - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : + try {
: : + ExecutorUtil.shutdownAndAwaitTermination(executor);
: : + } finally {
: : + cluster.shutdown();
: : + }
: : }
: : - QueryResponse rslt =
: : - new QueryRequest(new SolrQuery("*:*"))
: : - .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: : - .process(client, COLLECTION_NAME);
: : -
: : - assertEquals(10, rslt.getResults().size());
: : + }
: :
: : - DocCollection collection =
: : - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: : - assertNotNull(collection);
: : + @SuppressWarnings("rawtypes")
: : + private String getHostCoreName(
: : + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
: : + throws Exception {
: :
: : - Set<String> expectedNodes = new HashSet<>();
: : - expectedNodes.add(coordinatorJetty.getNodeName());
: : - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: : - assertTrue(expectedNodes.isEmpty());
: : + boolean found = false;
: : + SolrQuery q = new SolrQuery("*:*");
: : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
: : + p.accept(q);
: : + StringBuilder sb =
: : + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
: : + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
: : + SolrDocumentList docs = null;
: : + for (int i = 0; i < 100; i++) {
: : + try {
: : + SimpleOrderedMap rsp =
: : + (SimpleOrderedMap)
: : + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
: : + docs = (SolrDocumentList) rsp.get("response");
: : + if (docs.size() > 0) {
: : + found = true;
: : + break;
: : + }
: : + } catch (SolrException ex) {
: : + // we know we're doing tricky things that might cause transient errors
: : + // TODO: all these query requests go to the QA node -- should QA propagate internal request
: : + // errors
: : + // to the external client (and the external client retry?) or should QA attempt to failover
: : + // transparently
: : + // in the event of an error?
: : + if (i < 5) {
: : + log.info("swallowing transient error", ex);
: : + } else {
: : + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
: : + fail("initial error time threshold exceeded");
: : + }
: : + }
: : + Thread.sleep(100);
: : + }
: : + assertTrue(found);
: : + return (String) docs.get(0).getFieldValue("_core_");
: : }
: : }
: :
: :
:
: -Hoss
: http://www.lucidworks.com/
:
-Hoss
http://www.lucidworks.com/
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
For additional commands, e-mail: dev-help@solr.apache.org
Re: [solr] branch branch_9x updated: test case added for coordinator role
Posted by Chris Hostetter <ho...@fucit.org>.
Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a
ridiculous number of times since you added it a week ago.
IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main
builds)
-Hoss
: Date: Thu, 12 Jan 2023 07:54:33 +0000
: From: noble@apache.org
: Reply-To: dev@solr.apache.org
: To: "commits@solr.apache.org" <co...@solr.apache.org>
: Subject: [solr] branch branch_9x updated: test case added for coordinator role
:
: This is an automated email from the ASF dual-hosted git repository.
:
: noble pushed a commit to branch branch_9x
: in repository https://gitbox.apache.org/repos/asf/solr.git
:
:
: The following commit(s) were added to refs/heads/branch_9x by this push:
: new ec9b152c31f test case added for coordinator role
: ec9b152c31f is described below
:
: commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
: Author: Noble Paul <no...@gmail.com>
: AuthorDate: Thu Jan 12 18:54:15 2023 +1100
:
: test case added for coordinator role
: ---
: .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++--
: 1 file changed, 375 insertions(+), 37 deletions(-)
:
: diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: index 5e2dcfb70a8..6c4e845cf5a 100644
: --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: @@ -17,69 +17,407 @@
:
: package org.apache.solr.search;
:
: +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
: +import static org.apache.solr.common.params.CommonParams.TRUE;
: +
: +import java.lang.invoke.MethodHandles;
: +import java.util.Date;
: +import java.util.EnumSet;
: import java.util.HashSet;
: import java.util.List;
: +import java.util.Random;
: import java.util.Set;
: +import java.util.concurrent.ExecutorService;
: +import java.util.concurrent.Future;
: +import java.util.concurrent.TimeUnit;
: +import java.util.concurrent.atomic.AtomicBoolean;
: +import java.util.function.Consumer;
: import org.apache.solr.client.solrj.SolrQuery;
: import org.apache.solr.client.solrj.impl.CloudSolrClient;
: +import org.apache.solr.client.solrj.impl.HttpSolrClient;
: import org.apache.solr.client.solrj.request.CollectionAdminRequest;
: import org.apache.solr.client.solrj.request.QueryRequest;
: import org.apache.solr.client.solrj.request.UpdateRequest;
: import org.apache.solr.client.solrj.response.QueryResponse;
: +import org.apache.solr.cloud.MiniSolrCloudCluster;
: import org.apache.solr.cloud.SolrCloudTestCase;
: +import org.apache.solr.common.SolrDocumentList;
: +import org.apache.solr.common.SolrException;
: import org.apache.solr.common.SolrInputDocument;
: import org.apache.solr.common.cloud.DocCollection;
: +import org.apache.solr.common.cloud.Replica;
: +import org.apache.solr.common.params.CommonParams;
: +import org.apache.solr.common.util.ExecutorUtil;
: +import org.apache.solr.common.util.SimpleOrderedMap;
: +import org.apache.solr.common.util.SolrNamedThreadFactory;
: +import org.apache.solr.common.util.Utils;
: import org.apache.solr.core.NodeRoles;
: import org.apache.solr.embedded.JettySolrRunner;
: import org.apache.solr.servlet.CoordinatorHttpSolrCall;
: -import org.junit.BeforeClass;
: +import org.slf4j.Logger;
: +import org.slf4j.LoggerFactory;
:
: public class TestCoordinatorRole extends SolrCloudTestCase {
: -
: - @BeforeClass
: - public static void setupCluster() throws Exception {
: - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: - }
: + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
:
: public void testSimple() throws Exception {
: - CloudSolrClient client = cluster.getSolrClient();
: - String COLLECTION_NAME = "test_coll";
: - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: - .process(cluster.getSolrClient());
: - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: - UpdateRequest ur = new UpdateRequest();
: - for (int i = 0; i < 10; i++) {
: - SolrInputDocument doc2 = new SolrInputDocument();
: - doc2.addField("id", "" + i);
: - ur.add(doc2);
: - }
: + MiniSolrCloudCluster cluster =
: + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
: + try {
: + CloudSolrClient client = cluster.getSolrClient();
: + String COLLECTION_NAME = "test_coll";
: + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: + .process(cluster.getSolrClient());
: + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: + UpdateRequest ur = new UpdateRequest();
: + for (int i = 0; i < 10; i++) {
: + SolrInputDocument doc2 = new SolrInputDocument();
: + doc2.addField("id", "" + i);
: + ur.add(doc2);
: + }
:
: - ur.commit(client, COLLECTION_NAME);
: - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: - assertEquals(10, rsp.getResults().getNumFound());
: + ur.commit(client, COLLECTION_NAME);
: + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: + assertEquals(10, rsp.getResults().getNumFound());
:
: + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: + JettySolrRunner coordinatorJetty = null;
: + try {
: + coordinatorJetty = cluster.startJettySolrRunner();
: + } finally {
: + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: + }
: + QueryResponse rslt =
: + new QueryRequest(new SolrQuery("*:*"))
: + .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: + .process(client, COLLECTION_NAME);
: +
: + assertEquals(10, rslt.getResults().size());
: +
: + DocCollection collection =
: + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: + assertNotNull(collection);
: +
: + Set<String> expectedNodes = new HashSet<>();
: + expectedNodes.add(coordinatorJetty.getNodeName());
: + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: + assertTrue(expectedNodes.isEmpty());
: + } finally {
: + cluster.shutdown();
: + }
: + }
: +
: + public void testNRTRestart() throws Exception {
: + // we restart jetty and expect to find on disk data - need a local fs directory
: + useFactory(null);
: + String COLL = "coordinator_test_coll";
: + MiniSolrCloudCluster cluster =
: + configureCluster(3)
: + .withJettyConfig(jetty -> jetty.enableV2(true))
: + .addConfig("conf", configset("conf2"))
: + .configure();
: System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: - JettySolrRunner coordinatorJetty = null;
: + JettySolrRunner qaJetty = cluster.startJettySolrRunner();
: + String qaJettyBase = qaJetty.getBaseUrl().toString();
: + System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: + ExecutorService executor =
: + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty"));
: try {
: - coordinatorJetty = cluster.startJettySolrRunner();
: + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
: + .process(cluster.getSolrClient());
: + cluster.waitForActiveCollection(COLL, 1, 2);
: + DocCollection docColl =
: + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
: + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
: + assertNotNull(nrtReplica);
: + String nrtCore = nrtReplica.getCoreName();
: + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
: + assertNotNull(pullReplica);
: + String pullCore = pullReplica.getCoreName();
: +
: + SolrInputDocument sid = new SolrInputDocument();
: + sid.addField("id", "123");
: + sid.addField("desc_s", "A Document");
: + JettySolrRunner nrtJetty = null;
: + JettySolrRunner pullJetty = null;
: + for (JettySolrRunner j : cluster.getJettySolrRunners()) {
: + String nodeName = j.getNodeName();
: + if (nodeName.equals(nrtReplica.getNodeName())) {
: + nrtJetty = j;
: + } else if (nodeName.equals(pullReplica.getNodeName())) {
: + pullJetty = j;
: + }
: + }
: + assertNotNull(nrtJetty);
: + assertNotNull(pullJetty);
: + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
: + client.add(COLL, sid);
: + client.commit(COLL);
: + assertEquals(
: + nrtCore,
: + getHostCoreName(
: + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT")));
: + assertEquals(
: + pullCore,
: + getHostCoreName(
: + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL")));
: + // Now , kill NRT jetty
: + JettySolrRunner nrtJettyF = nrtJetty;
: + JettySolrRunner pullJettyF = pullJetty;
: + Random r = random();
: + final long establishBaselineMs = r.nextInt(1000);
: + final long nrtDowntimeMs = r.nextInt(10000);
: + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our
: + // indexing code,
: + // based on the fact that our indexing is based on a PULL-node client.
: + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
: + Future<?> jettyManipulationFuture =
: + executor.submit(
: + () -> {
: + // we manipulate the jetty instances in a separate thread to more closely mimic
: + // the behavior we'd
: + // see irl.
: + try {
: + Thread.sleep(establishBaselineMs);
: + log.info("stopping NRT jetty ...");
: + nrtJettyF.stop();
: + log.info("NRT jetty stopped.");
: + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while
: + log.info("restarting NRT jetty ...");
: + nrtJettyF.start(true);
: + log.info("NRT jetty restarted.");
: + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK
: + // state
: + // used for query request routing has expired (60s). But here we force a return
: + // to NRT
: + // by stopping the PULL replica after a brief delay ...
: + Thread.sleep(pullServiceTimeMs);
: + log.info("stopping PULL jetty ...");
: + pullJettyF.stop();
: + log.info("PULL jetty stopped.");
: + } catch (Exception e) {
: + throw new RuntimeException(e);
: + }
: + });
: + String hostCore;
: + long start = new Date().getTime();
: + long individualRequestStart = start;
: + int count = 0;
: + while (nrtCore.equals(
: + hostCore =
: + getHostCoreName(
: + COLL,
: + qaJettyBase,
: + client,
: + p -> p.add("shards.preference", "replica.type:NRT")))) {
: + count++;
: + individualRequestStart = new Date().getTime();
: + }
: + long now = new Date().getTime();
: + log.info(
: + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}",
: + count,
: + now - start,
: + establishBaselineMs,
: + now - individualRequestStart);
: + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be
: + // very fast,
: + // because our QA-based client already knows both replicas are active, the index is stable,
: + // so the moment
: + // the client finds NRT is down it should be able to failover immediately and transparently
: + // to PULL.
: + assertEquals(
: + "when we break out of the NRT query loop, should be b/c routed to PULL",
: + pullCore,
: + hostCore);
: + SolrInputDocument d = new SolrInputDocument();
: + d.addField("id", "345");
: + d.addField("desc_s", "Another Document");
: + // attempts to add another doc while NRT is down should fail, then eventually succeed when
: + // NRT comes back up
: + count = 0;
: + start = new Date().getTime();
: + individualRequestStart = start;
: + for (; ; ) {
: + try {
: + client.add(COLL, d);
: + client.commit(COLL);
: + break;
: + } catch (SolrException ex) {
: + // we expect these until nrtJetty is back up.
: + count++;
: + Thread.sleep(100);
: + }
: + individualRequestStart = new Date().getTime();
: + }
: + now = new Date().getTime();
: + log.info(
: + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
: + now - individualRequestStart,
: + now - start,
: + nrtDowntimeMs,
: + count);
: + // NRT replica is back up, registered as available with Zk, and availability info has been
: + // pulled down by
: + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit
: + // completed. All of this
: + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if
: + // it causes failures
: + // regularly we should feel free to increase the tolerance; but it's meant to provide a
: + // stable baseline from
: + // which to detect regressions.
: + count = 0;
: + start = new Date().getTime();
: + individualRequestStart = start;
: + while (pullCore.equals(
: + hostCore =
: + getHostCoreName(
: + COLL,
: + qaJettyBase,
: + client,
: + p -> {
: + p.set(CommonParams.Q, "id:345");
: + p.add("shards.preference", "replica.type:NRT");
: + }))) {
: + count++;
: + Thread.sleep(100);
: + individualRequestStart = new Date().getTime();
: + }
: + now = new Date().getTime();
: + log.info(
: + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}",
: + count,
: + now - start,
: + pullServiceTimeMs,
: + now - individualRequestStart);
: + assertEquals(nrtCore, hostCore);
: + // allow any exceptions to propagate
: + jettyManipulationFuture.get();
: + if (true) return;
: +
: + // next phase: just toggle a bunch
: + // TODO: could separate this out into a different test method, but this should suffice for
: + // now
: + pullJetty.start(true);
: + AtomicBoolean done = new AtomicBoolean();
: + long runMinutes = 1;
: + long finishTimeMs =
: + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
: + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF};
: + Random threadRandom = new Random(r.nextInt());
: + Future<Integer> f =
: + executor.submit(
: + () -> {
: + int iteration = 0;
: + while (new Date().getTime() < finishTimeMs && !done.get()) {
: + int idx = iteration++ % jettys.length;
: + JettySolrRunner toManipulate = jettys[idx];
: + try {
: + int serveTogetherTime = threadRandom.nextInt(7000);
: + int downTime = threadRandom.nextInt(7000);
: + log.info("serving together for {}ms", serveTogetherTime);
: + Thread.sleep(serveTogetherTime);
: + log.info("stopping {} ...", idx);
: + toManipulate.stop();
: + log.info("stopped {}.", idx);
: + Thread.sleep(downTime);
: + log.info("restarting {} ...", idx);
: + toManipulate.start(true);
: + log.info("restarted {}.", idx);
: + } catch (Exception e) {
: + throw new RuntimeException(e);
: + }
: + }
: + done.set(true);
: + return iteration;
: + });
: + count = 0;
: + start = new Date().getTime();
: + try {
: + do {
: + pullCore.equals(
: + hostCore =
: + getHostCoreName(
: + COLL,
: + qaJettyBase,
: + client,
: + p -> {
: + p.set(CommonParams.Q, "id:345");
: + p.add("shards.preference", "replica.type:NRT");
: + }));
: + count++;
: + Thread.sleep(100);
: + } while (!done.get());
: + } finally {
: + final String result;
: + if (done.getAndSet(true)) {
: + result = "Success";
: + } else {
: + // not yet set to done, completed abnormally (exception will be thrown beyond `finally`
: + // block)
: + result = "Failure";
: + }
: + Integer toggleCount = f.get();
: + long secondsDuration =
: + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS);
: + log.info(
: + "{}! {} seconds, {} toggles, {} requests served",
: + result,
: + secondsDuration,
: + toggleCount,
: + count);
: + }
: + }
: } finally {
: - System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: + try {
: + ExecutorUtil.shutdownAndAwaitTermination(executor);
: + } finally {
: + cluster.shutdown();
: + }
: }
: - QueryResponse rslt =
: - new QueryRequest(new SolrQuery("*:*"))
: - .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: - .process(client, COLLECTION_NAME);
: -
: - assertEquals(10, rslt.getResults().size());
: + }
:
: - DocCollection collection =
: - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: - assertNotNull(collection);
: + @SuppressWarnings("rawtypes")
: + private String getHostCoreName(
: + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p)
: + throws Exception {
:
: - Set<String> expectedNodes = new HashSet<>();
: - expectedNodes.add(coordinatorJetty.getNodeName());
: - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName()));
: - assertTrue(expectedNodes.isEmpty());
: + boolean found = false;
: + SolrQuery q = new SolrQuery("*:*");
: + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
: + p.accept(q);
: + StringBuilder sb =
: + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
: + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
: + SolrDocumentList docs = null;
: + for (int i = 0; i < 100; i++) {
: + try {
: + SimpleOrderedMap rsp =
: + (SimpleOrderedMap)
: + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER);
: + docs = (SolrDocumentList) rsp.get("response");
: + if (docs.size() > 0) {
: + found = true;
: + break;
: + }
: + } catch (SolrException ex) {
: + // we know we're doing tricky things that might cause transient errors
: + // TODO: all these query requests go to the QA node -- should QA propagate internal request
: + // errors
: + // to the external client (and the external client retry?) or should QA attempt to failover
: + // transparently
: + // in the event of an error?
: + if (i < 5) {
: + log.info("swallowing transient error", ex);
: + } else {
: + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex);
: + fail("initial error time threshold exceeded");
: + }
: + }
: + Thread.sleep(100);
: + }
: + assertTrue(found);
: + return (String) docs.get(0).getFieldValue("_core_");
: }
: }
:
:
-Hoss
http://www.lucidworks.com/
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@solr.apache.org
For additional commands, e-mail: dev-help@solr.apache.org