You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/12/16 22:58:15 UTC
[lucene-solr] branch master updated: SOLR-14081: re-implement
FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase
This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new db11e9e SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase
db11e9e is described below
commit db11e9e9a2c07136399ba002f2bbefe8c611b0a0
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Dec 16 15:58:06 2019 -0700
SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase
---
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 946 ++++++++-------------
1 file changed, 340 insertions(+), 606 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index d6cbf9b..4bd4b52 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -16,497 +16,274 @@
*/
package org.apache.solr.cloud;
-import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.SocketProxy;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Super basic testing, no shard restarting or anything.
*/
@Slow
-@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase {
-
+public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
+
@BeforeClass
- public static void beforeSuperClass() {
- schemaString = "schema15.xml"; // we need a string id
+ public static void setupCluster() throws Exception {
+ // use a 5 node cluster so with a typical 2x2 collection one node isn't involved
+ // helps to randomly test edge cases of hitting a node not involved in collection
+ configureCluster(5).configure();
}
-
- public FullSolrCloudDistribCmdsTest() {
- super();
- sliceCount = 3;
+
+ @After
+ public void purgeAllCollections() throws Exception {
+ cluster.deleteAllCollections();
+ cluster.getSolrClient().setDefaultCollection(null);
}
+ /**
+ * Creates a new 2x2 collection using a unique name, blocking until it's state is fully active,
+ * and sets that collection as the default on the cluster's default CloudSolrClient.
+ *
+ * @return the name of the new collection
+ */
+ public static String createAndSetNewDefaultCollection() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
+ assertEquals(RequestStatusState.COMPLETED,
+ CollectionAdminRequest.createCollection(name, "_default", 2, 2)
+ .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+ cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
+ cloudClient.setDefaultCollection(name);
+ return name;
+ }
+
@Test
- @ShardsFixed(num = 6)
- // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
- public void test() throws Exception {
- handle.clear();
- handle.put("timestamp", SKIPVAL);
-
- waitForRecoveriesToFinish(false);
+ public void testBasicUpdates() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
// add a doc, update it, and delete it
+ addUpdateDelete("doc1");
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
- QueryResponse results;
- UpdateRequest uReq;
- long docId = addUpdateDelete();
-
- // add 2 docs in a request
- SolrInputDocument doc1;
- SolrInputDocument doc2;
- docId = addTwoDocsInOneRequest(docId);
-
- // two deletes
- uReq = new UpdateRequest();
- uReq.deleteById(Long.toString(docId-1));
- uReq.deleteById(Long.toString(docId-2)).process(cloudClient);
- controlClient.deleteById(Long.toString(docId-1));
- controlClient.deleteById(Long.toString(docId-2));
-
- commit();
-
- results = query(cloudClient);
- assertEquals(0, results.getResults().getNumFound());
-
- results = query(controlClient);
- assertEquals(0, results.getResults().getNumFound());
-
- // add two docs together, a 3rd doc and a delete
- indexr("id", docId++, t1, "originalcontent");
-
- uReq = new UpdateRequest();
- doc1 = new SolrInputDocument();
-
- addFields(doc1, "id", docId++);
- uReq.add(doc1);
- doc2 = new SolrInputDocument();
- addFields(doc2, "id", docId++);
- uReq.add(doc2);
-
- uReq.process(cloudClient);
- uReq.process(controlClient);
-
- uReq = new UpdateRequest();
- uReq.deleteById(Long.toString(docId - 2)).process(cloudClient);
- controlClient.deleteById(Long.toString(docId - 2));
-
- commit();
-
- assertDocCounts(VERBOSE);
-
- checkShardConsistency();
-
- results = query(controlClient);
- assertEquals(2, results.getResults().getNumFound());
-
- results = query(cloudClient);
- assertEquals(2, results.getResults().getNumFound());
-
- docId = testIndexQueryDeleteHierarchical(docId);
-
- docId = testIndexingDocPerRequestWithHttpSolrClient(docId);
-
- testConcurrentIndexing(docId);
+ // add 2 docs in a single request
+ addTwoDocsInOneRequest("doc2", "doc3");
+ assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ // 2 deletes in a single request...
+ assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3"))
+ .process(cloudClient).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- // TODO: testOptimisticUpdate(results);
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
- testDeleteByQueryDistrib();
-
- // See SOLR-7384
-// testDeleteByIdImplicitRouter();
-//
-// testDeleteByIdCompositeRouterWithRouterField();
+ // add a doc that we will then delete later after adding two other docs (all before next commit).
+ assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus());
+ assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"),
+ sdoc("id", "doc6"))).getStatus());
+ assertEquals(0, cloudClient.deleteById("doc4").getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- docId = testThatCantForwardToLeaderFails(docId);
-
-
- docId = testIndexingBatchPerRequestWithHttpSolrClient(docId);
- }
+ assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound());
+ assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6"));
- private void testDeleteByIdImplicitRouter() throws Exception {
- SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
- CollectionAdminResponse response;
- Map<String, NamedList<Integer>> coresStatus;
-
- CollectionAdminRequest.Create createCollectionRequest
- = CollectionAdminRequest.createCollectionWithImplicitRouter("implicit_collection_without_routerfield",
- "conf1","shard1,shard2",2);
- response = createCollectionRequest.process(server);
-
- assertEquals(0, response.getStatus());
- assertTrue(response.isSuccess());
- coresStatus = response.getCollectionCoresStatus();
- assertEquals(4, coresStatus.size());
- for (int i = 0; i < 4; i++) {
- NamedList<Integer> status = coresStatus.get("implicit_collection_without_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
- assertEquals(0, (int) status.get("status"));
- assertTrue(status.get("QTime") > 0);
- }
+ // delete everything....
+ assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
- waitForRecoveriesToFinish("implicit_collection_without_routerfield", true);
-
- SolrClient shard1 = createNewSolrClient("implicit_collection_without_routerfield_shard1_replica1",
- getBaseUrl((HttpSolrClient) clients.get(0)));
- SolrClient shard2 = createNewSolrClient("implicit_collection_without_routerfield_shard2_replica1",
- getBaseUrl((HttpSolrClient) clients.get(0)));
-
- SolrInputDocument doc = new SolrInputDocument();
- int docCounts1, docCounts2;
-
- // Add three documents to shard1
- doc.clear();
- doc.addField("id", "1");
- doc.addField("title", "s1 one");
- shard1.add(doc);
- shard1.commit();
-
- doc.clear();
- doc.addField("id", "2");
- doc.addField("title", "s1 two");
- shard1.add(doc);
- shard1.commit();
-
- doc.clear();
- doc.addField("id", "3");
- doc.addField("title", "s1 three");
- shard1.add(doc);
- shard1.commit();
-
- docCounts1 = 3; // Three documents in shard1
-
- // Add two documents to shard2
- doc.clear();
- doc.addField("id", "4");
- doc.addField("title", "s2 four");
- shard2.add(doc);
- shard2.commit();
-
- doc.clear();
- doc.addField("id", "5");
- doc.addField("title", "s2 five");
- shard2.add(doc);
- shard2.commit();
-
- docCounts2 = 2; // Two documents in shard2
-
- // Verify the documents were added to correct shards
- ModifiableSolrParams query = new ModifiableSolrParams();
- query.set("q", "*:*");
- QueryResponse respAll = shard1.query(query);
- assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
-
- query.set("shards", "shard1");
- QueryResponse resp1 = shard1.query(query);
- assertEquals(docCounts1, resp1.getResults().getNumFound());
-
- query.set("shards", "shard2");
- QueryResponse resp2 = shard2.query(query);
- assertEquals(docCounts2, resp2.getResults().getNumFound());
-
-
- // Delete a document in shard2 with update to shard1, with _route_ param
- // Should delete.
- UpdateRequest deleteRequest = new UpdateRequest();
- deleteRequest.deleteById("4", "shard2");
- shard1.request(deleteRequest);
- shard1.commit();
- query.set("shards", "shard2");
- resp2 = shard2.query(query);
- assertEquals(--docCounts2, resp2.getResults().getNumFound());
-
- // Delete a document in shard2 with update to shard1, without _route_ param
- // Shouldn't delete, since deleteById requests are not broadcast to all shard leaders.
- deleteRequest = new UpdateRequest();
- deleteRequest.deleteById("5");
- shard1.request(deleteRequest);
- shard1.commit();
- query.set("shards", "shard2");
- resp2 = shard2.query(query);
- assertEquals(docCounts2, resp2.getResults().getNumFound());
-
- // Multiple deleteById commands in a single request
- deleteRequest.clear();
- deleteRequest.deleteById("2", "shard1");
- deleteRequest.deleteById("3", "shard1");
- deleteRequest.setCommitWithin(1);
- query.set("shards", "shard1");
- shard2.request(deleteRequest);
- resp1 = shard1.query(query);
- --docCounts1;
- --docCounts1;
- assertEquals(docCounts1, resp1.getResults().getNumFound());
-
- // Test commitWithin, update to shard2, document deleted in shard1
- deleteRequest.clear();
- deleteRequest.deleteById("1", "shard1");
- deleteRequest.setCommitWithin(1);
- shard2.request(deleteRequest);
- Thread.sleep(1000);
- query.set("shards", "shard1");
- resp1 = shard1.query(query);
- assertEquals(--docCounts1, resp1.getResults().getNumFound());
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+
}
- private void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
- SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
- CollectionAdminResponse response;
- Map<String, NamedList<Integer>> coresStatus;
-
- response = CollectionAdminRequest.createCollection("compositeid_collection_with_routerfield","conf1",2,2)
- .setRouterName("compositeId")
- .setRouterField("routefield_s")
- .setShards("shard1,shard2")
- .process(server);
-
- assertEquals(0, response.getStatus());
- assertTrue(response.isSuccess());
- coresStatus = response.getCollectionCoresStatus();
- assertEquals(4, coresStatus.size());
- for (int i = 0; i < 4; i++) {
- NamedList<Integer> status = coresStatus.get("compositeid_collection_with_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
- assertEquals(0, (int) status.get("status"));
- assertTrue(status.get("QTime") > 0);
- }
-
- waitForRecoveriesToFinish("compositeid_collection_with_routerfield", true);
-
- SolrClient shard1 = createNewSolrClient("compositeid_collection_with_routerfield_shard1_replica1",
- getBaseUrl((HttpSolrClient) clients.get(0)));
- SolrClient shard2 = createNewSolrClient("compositeid_collection_with_routerfield_shard2_replica1",
- getBaseUrl((HttpSolrClient) clients.get(0)));
-
- SolrInputDocument doc = new SolrInputDocument();
- int docCounts1 = 0, docCounts2 = 0;
-
- // Add three documents to shard1
- doc.clear();
- doc.addField("id", "1");
- doc.addField("title", "s1 one");
- doc.addField("routefield_s", "europe");
- shard1.add(doc);
- shard1.commit();
-
- doc.clear();
- doc.addField("id", "2");
- doc.addField("title", "s1 two");
- doc.addField("routefield_s", "europe");
- shard1.add(doc);
- shard1.commit();
-
- doc.clear();
- doc.addField("id", "3");
- doc.addField("title", "s1 three");
- doc.addField("routefield_s", "europe");
- shard1.add(doc);
- shard1.commit();
-
- docCounts1 = 3; // Three documents in shard1
-
- // Add two documents to shard2
- doc.clear();
- doc.addField("id", "4");
- doc.addField("title", "s2 four");
- doc.addField("routefield_s", "africa");
- shard2.add(doc);
- //shard2.commit();
-
- doc.clear();
- doc.addField("id", "5");
- doc.addField("title", "s2 five");
- doc.addField("routefield_s", "africa");
- shard2.add(doc);
- shard2.commit();
-
- docCounts2 = 2; // Two documents in shard2
-
- // Verify the documents were added to correct shards
- ModifiableSolrParams query = new ModifiableSolrParams();
- query.set("q", "*:*");
- QueryResponse respAll = shard1.query(query);
- assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
-
- query.set("shards", "shard1");
- QueryResponse resp1 = shard1.query(query);
- assertEquals(docCounts1, resp1.getResults().getNumFound());
-
- query.set("shards", "shard2");
- QueryResponse resp2 = shard2.query(query);
- assertEquals(docCounts2, resp2.getResults().getNumFound());
-
- // Delete a document in shard2 with update to shard1, with _route_ param
- // Should delete.
- UpdateRequest deleteRequest = new UpdateRequest();
- deleteRequest.deleteById("4", "africa");
- deleteRequest.setCommitWithin(1);
- shard1.request(deleteRequest);
- shard1.commit();
-
- query.set("shards", "shard2");
- resp2 = shard2.query(query);
- --docCounts2;
- assertEquals(docCounts2, resp2.getResults().getNumFound());
-
- // Multiple deleteById commands in a single request
- deleteRequest.clear();
- deleteRequest.deleteById("2", "europe");
- deleteRequest.deleteById("3", "europe");
- deleteRequest.setCommitWithin(1);
- query.set("shards", "shard1");
- shard1.request(deleteRequest);
- shard1.commit();
- Thread.sleep(1000);
- resp1 = shard1.query(query);
- --docCounts1;
- --docCounts1;
- assertEquals(docCounts1, resp1.getResults().getNumFound());
-
- // Test commitWithin, update to shard2, document deleted in shard1
- deleteRequest.clear();
- deleteRequest.deleteById("1", "europe");
- deleteRequest.setCommitWithin(1);
- shard2.request(deleteRequest);
- query.set("shards", "shard1");
- resp1 = shard1.query(query);
- --docCounts1;
- assertEquals(docCounts1, resp1.getResults().getNumFound());
- }
- private long testThatCantForwardToLeaderFails(long docId) throws Exception {
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
- ZkNodeProps props = zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard1");
-
- chaosMonkey.stopShard("shard1");
-
- Thread.sleep(1000);
+ public void testThatCantForwardToLeaderFails() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement();
+ cloudClient.setDefaultCollection(collectionName);
- // fake that the leader is still advertised
- String leaderPath = ZkStateReader.getShardLeadersPath(DEFAULT_COLLECTION, "shard1");
- SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), 10000);
- int fails = 0;
+ // get a random node for use in our collection before creating the one we'll partition..
+ final JettySolrRunner otherLeader = cluster.getRandomJetty(random());
+ // pick a (second) random node (which may be the same) for sending updates to
+ // (if it's the same, we're testing routing from another shard, if diff we're testing routing
+ // from a non-collection node)
+ final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName;
+
+ // create a new node for the purpose of killing it...
+ final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner();
try {
- zkClient.makePath(leaderPath, Utils.toJSON(props),
- CreateMode.EPHEMERAL, true);
- for (int i = 0; i < 200; i++) {
- try {
- index_specific(shardToJetty.get("shard2").get(0).client.solrClient, id, docId++);
- } catch (SolrException e) {
- // expected
- fails++;
- break;
- } catch (SolrServerException e) {
- // expected
- fails++;
- break;
+ cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+
+ // HACK: we have to stop the node in order to enable the proxy, in order to then restart the node
+ // (in order to then "partition it" later via the proxy)
+ final SocketProxy proxy = new SocketProxy();
+ cluster.stopJettySolrRunner(leaderToPartition);
+ cluster.waitForJettyToStop(leaderToPartition);
+ leaderToPartition.setProxyPort(proxy.getListenPort());
+ cluster.startJettySolrRunner(leaderToPartition);
+ proxy.open(leaderToPartition.getBaseUrl().toURI());
+ try {
+ log.info("leaderToPartition's Proxy: {}", proxy);
+
+ cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+ // create a 2x1 collection using a nodeSet that includes our leaderToPartition...
+ assertEquals(RequestStatusState.COMPLETED,
+ CollectionAdminRequest.createCollection(collectionName, 2, 1)
+ .setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName())
+ .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+
+ cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
+
+ { // HACK: Check the leaderProps for the shard hosted on the node we're going to kill...
+ final Replica leaderProps = cloudClient.getZkStateReader()
+ .getClusterState().getCollection(collectionName)
+ .getLeaderReplicas(leaderToPartition.getNodeName()).get(0);
+
+ // No point in this test if these aren't true...
+ assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(),
+ leaderProps.getStr(Slice.LEADER));
+ assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(),
+ leaderProps.getCoreUrl().contains(""+proxy.getListenPort()));
}
+
+ // create client to send our updates to...
+ try (HttpSolrClient indexClient = getHttpSolrClient(indexingUrl)) {
+
+ // Sanity check: we should be able to send a bunch of updates that work right now...
+ for (int i = 0; i < 100; i++) {
+ final UpdateResponse rsp = indexClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+ assertEquals(0, rsp.getStatus());
+ }
+
+ log.info("Closing leaderToPartition's proxy: {}", proxy);
+ proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure
+
+ final SolrException e = expectThrows(SolrException.class, () -> {
+ // start at 50 so that we have some "updates" to previous docs and some "adds"...
+ for (int i = 50; i < 250; i++) {
+ // Pure random odds of all of these docs belonging to the live shard are 1 in 2**200...
+ // Except we know the hashing algorithm isn't purely random,
+ // So the actual odds are "0" unless the hashing algorithm is changed to suck badly...
+ final UpdateResponse rsp = indexClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+ // if the update didn't throw an exception, it better be a success..
+ assertEquals(0, rsp.getStatus());
+ }
+ });
+ assertEquals(500, e.code());
+ }
+ } finally {
+ proxy.close(); // don't leak this port
}
} finally {
- zkClient.close();
+ cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests
+ cluster.waitForJettyToStop(leaderToPartition);
}
-
- assertTrue("A whole shard is down - some of these should fail", fails > 0);
- return docId;
}
+
+ /** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+ private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
- private long addTwoDocsInOneRequest(long docId) throws
- Exception {
- QueryResponse results;
- UpdateRequest uReq;
- uReq = new UpdateRequest();
- docId = addDoc(docId, uReq);
- docId = addDoc(docId, uReq);
-
- uReq.process(cloudClient);
- uReq.process(controlClient);
-
- commit();
-
- checkShardConsistency();
+ assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA),
+ sdoc("id", docIdB))).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- assertDocCounts(VERBOSE);
+ assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")")
+ ).getResults().getNumFound());
- results = query(cloudClient);
- assertEquals(2, results.getResults().getNumFound());
- return docId;
+ checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs"));
}
- private long addUpdateDelete() throws Exception,
- IOException {
- long docId = 99999999L;
- indexr("id", docId, t1, "originalcontent");
+ /** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+ private void addUpdateDelete(String docId) throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+
+ // add the doc, confirm we can query it...
+ assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- commit();
+ assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+ assertEquals(1,
+ cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId))
+ .getResults().getNumFound());
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.add("q", t1 + ":originalcontent");
- QueryResponse results = clients.get(0).query(params);
- assertEquals(1, results.getResults().getNumFound());
+ checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc"));
// update doc
- indexr("id", docId, t1, "updatedcontent");
-
- commit();
-
- assertDocCounts(VERBOSE);
+ assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- results = clients.get(0).query(params);
- assertEquals(0, results.getResults().getNumFound());
+ // confirm we can query the doc by updated content and not original...
+ assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+ assertEquals(1,
+ cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId))
+ .getResults().getNumFound());
- params.set("q", t1 + ":updatedcontent");
+ // delete the doc, confim it no longer matches in queries...
+ assertEquals(0, cloudClient.deleteById(docId).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
- results = clients.get(0).query(params);
- assertEquals(1, results.getResults().getNumFound());
+ assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+ assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
- UpdateRequest uReq = new UpdateRequest();
- //uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
- uReq.deleteById(Long.toString(docId)).process(clients.get(0));
-
- commit();
-
- results = clients.get(0).query(params);
- assertEquals(0, results.getResults().getNumFound());
- return docId;
- }
+ checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc"));
- private void testDeleteByQueryDistrib() throws Exception {
- del("*:*");
- commit();
- assertEquals(0, query(cloudClient).getResults().getNumFound());
}
- private long testIndexQueryDeleteHierarchical(long docId) throws Exception {
- //index
- int topDocsNum = atLeast(10);
+
+ public long testIndexQueryDeleteHierarchical() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ // index
+ long docId = 42;
+ int topDocsNum = atLeast(5);
int childsNum = 5+random().nextInt(5);
for (int i = 0; i < topDocsNum; ++i) {
UpdateRequest uReq = new UpdateRequest();
@@ -522,40 +299,46 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase
}
uReq.add(topDocument);
- uReq.process(cloudClient);
- uReq.process(controlClient);
+ assertEquals(i + "/" + docId,
+ 0, uReq.process(cloudClient).getStatus());
}
+ assertEquals(0, cloudClient.commit().getStatus());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids"));
- commit();
- checkShardConsistency();
- assertDocCounts(VERBOSE);
+ // query
- //query
// parents
- SolrQuery query = new SolrQuery("type_s:parent");
- QueryResponse results = cloudClient.query(query);
- assertEquals(topDocsNum, results.getResults().getNumFound());
+ assertEquals(topDocsNum,
+ cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound());
- //childs
- query = new SolrQuery("type_s:child");
- results = cloudClient.query(query);
- assertEquals(topDocsNum * childsNum, results.getResults().getNumFound());
+ // childs
+ assertEquals(topDocsNum * childsNum,
+ cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound());
+
- //grandchilds
- query = new SolrQuery("type_s:grand");
- results = cloudClient.query(query);
+ // grandchilds
+ //
//each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands
//x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1
//x = 2 * ((t-1) * t / 2) = t * (t - 1)
- assertEquals(topDocsNum * childsNum * (childsNum - 1), results.getResults().getNumFound());
+ assertEquals(topDocsNum * childsNum * (childsNum - 1),
+ cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound());
//delete
- del("*:*");
- commit();
+ assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
return docId;
}
+
+ /**
+ * Recursive helper function for building out child and grandchild docs
+ */
private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) {
SolrInputDocument childDocument = new SolrInputDocument();
childDocument.addField("id", docId++);
@@ -574,188 +357,139 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase
}
- private long testIndexingDocPerRequestWithHttpSolrClient(long docId) throws Exception {
- int docs = random().nextInt(TEST_NIGHTLY ? 4013 : 97) + 1;
- for (int i = 0; i < docs; i++) {
+ public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocs = atLeast(50);
+ for (int i = 0; i < numDocs; i++) {
UpdateRequest uReq;
uReq = new UpdateRequest();
- docId = addDoc(docId, uReq);
-
- uReq.process(cloudClient);
- uReq.process(controlClient);
-
+ assertEquals(0, cloudClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus());
}
- commit();
-
- checkShardConsistency();
- assertDocCounts(VERBOSE);
+ assertEquals(0, cloudClient.commit().getStatus());
+ assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
- return docId++;
+ checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
- private long testIndexingBatchPerRequestWithHttpSolrClient(long docId) throws Exception {
-
- // remove collection
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", CollectionAction.DELETE.toString());
- params.set("name", "collection1");
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
-
-
- cloudClient.request(request);
-
- controlClient.deleteByQuery("*:*");
- controlClient.commit();
-
- // somtimes we use an oversharded collection
- createCollection(null, "collection2", 7, 3, 100000, cloudClient, null, "conf1");
- cloudClient.setDefaultCollection("collection2");
- waitForRecoveriesToFinish("collection2", false);
-
- class IndexThread extends Thread {
- Integer name;
+ public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocsPerBatch = atLeast(5);
+ final int numBatchesPerThread = atLeast(5);
+
+ final CountDownLatch abort = new CountDownLatch(1);
+ class BatchIndexer implements Runnable {
+ private boolean keepGoing() {
+ return 0 < abort.getCount();
+ }
- public IndexThread(Integer name) {
+ final int name;
+ public BatchIndexer(int name) {
this.name = name;
}
@Override
public void run() {
- int rnds = random().nextInt(TEST_NIGHTLY ? 10 : 3) + 1;
- for (int i = 0; i < rnds; i++) {
- UpdateRequest uReq;
- uReq = new UpdateRequest();
- int cnt = random().nextInt(TEST_NIGHTLY ? 2000 : 200) + 1;
- for (int j = 0; j <cnt; j++) {
- addDoc("thread" + name + "_" + i + "_" + j, uReq);
- }
-
- try {
- uReq.process(cloudClient);
- uReq.process(controlClient);
- } catch (SolrServerException | IOException e) {
- throw new RuntimeException(e);
+ try {
+ for (int batchId = 0; batchId < numBatchesPerThread && keepGoing(); batchId++) {
+ final UpdateRequest req = new UpdateRequest();
+ for (int docId = 0; docId < numDocsPerBatch && keepGoing(); docId++) {
+ req.add(sdoc("id", "indexer" + name + "_" + batchId + "_" + docId,
+ "test_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+ }
+ assertEquals(0, req.process(cloudClient).getStatus());
}
-
-
+ } catch (Throwable e) {
+ abort.countDown();
+ throw new RuntimeException(e);
}
}
};
- List<Thread> threads = new ArrayList<>();
-
- int nthreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
- for (int i = 0; i < nthreads; i++) {
- IndexThread thread = new IndexThread(i);
- threads.add(thread);
- thread.start();
+ final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("batchIndexing");
+ final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
+ final List<Future<?>> futures = new ArrayList<>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(executor.submit(new BatchIndexer(i)));
}
-
- for (Thread thread : threads) {
- thread.join();
+ final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch;
+ ExecutorUtil.shutdownAndAwaitTermination(executor);
+
+ for (Future result : futures) {
+ assertFalse(result.isCancelled());
+ assertTrue(result.isDone());
+ // all we care about is propogating any possibile execution exception...
+ final Object ignored = result.get();
}
- commit();
-
- waitForRecoveriesToFinish("collection2", false);
-
- printLayout();
-
- SolrQuery query = new SolrQuery("*:*");
- long controlCount = controlClient.query(query).getResults()
- .getNumFound();
- long cloudCount = cloudClient.query(query).getResults().getNumFound();
-
-
- CloudInspectUtil.compareResults(controlClient, cloudClient);
-
- assertEquals("Control does not match cloud", controlCount, cloudCount);
- System.out.println("DOCS:" + controlCount);
-
- return docId;
+ cloudClient.commit();
+ assertEquals(totalDocsExpected, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+ checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
}
- private long addDoc(long docId, UpdateRequest uReq) {
- addDoc(Long.toString(docId++), uReq);
- return docId;
- }
-
- private long addDoc(String docId, UpdateRequest uReq) {
- SolrInputDocument doc1 = new SolrInputDocument();
-
- uReq.add(doc1);
- addFields(doc1, "id", docId, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc");
- return -1;
- }
-
- private long testConcurrentIndexing(long docId) throws Exception {
- QueryResponse results = query(cloudClient);
- long beforeCount = results.getResults().getNumFound();
- int cnt = TEST_NIGHTLY ? 2933 : 313;
- try (ConcurrentUpdateSolrClient concurrentClient = getConcurrentUpdateSolrClient(
- ((HttpSolrClient) clients.get(0)).getBaseURL(), 10, 2, 120000)) {
- for (int i = 0; i < cnt; i++) {
- index_specific(concurrentClient, id, docId++, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc");
+ public void testConcurrentIndexing() throws Exception {
+ final CloudSolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocs = atLeast(50);
+ final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
+ try (ConcurrentUpdateSolrClient indexClient
+ = getConcurrentUpdateSolrClient(nodeToUpdate.getProxyBaseUrl() + "/" + collectionName, 10, 2)) {
+
+ for (int i = 0; i < numDocs; i++) {
+ indexClient.add(sdoc("id", i, "text_t",
+ TestUtil.randomRealisticUnicodeString(random(), 200)));
}
- concurrentClient.blockUntilFinished();
+ indexClient.blockUntilFinished();
- commit();
+ assertEquals(0, indexClient.commit().getStatus());
+ assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
- checkShardConsistency();
- assertDocCounts(VERBOSE);
+ checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
- results = query(cloudClient);
- assertEquals(beforeCount + cnt, results.getResults().getNumFound());
- return docId;
}
- private void testOptimisticUpdate(QueryResponse results) throws Exception {
- SolrDocument doc = results.getResults().get(0);
- Long version = (Long) doc.getFieldValue(VERSION_FIELD);
- Integer theDoc = (Integer) doc.getFieldValue("id");
- UpdateRequest uReq = new UpdateRequest();
- SolrInputDocument doc1 = new SolrInputDocument();
- uReq.setParams(new ModifiableSolrParams());
- uReq.getParams().set(VERSION_FIELD, Long.toString(version));
- addFields(doc1, "id", theDoc, t1, "theupdatestuff");
- uReq.add(doc1);
-
- uReq.process(cloudClient);
- uReq.process(controlClient);
-
- commit();
-
- // updating the old version should fail...
- SolrInputDocument doc2 = new SolrInputDocument();
- uReq = new UpdateRequest();
- uReq.setParams(new ModifiableSolrParams());
- uReq.getParams().set(VERSION_FIELD, Long.toString(version));
- addFields(doc2, "id", theDoc, t1, "thenewupdatestuff");
- uReq.add(doc2);
-
- uReq.process(cloudClient);
- uReq.process(controlClient);
-
- commit();
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.add("q", t1 + ":thenewupdatestuff");
- QueryResponse res = clients.get(0).query(params);
- assertEquals(0, res.getResults().getNumFound());
-
- params = new ModifiableSolrParams();
- params.add("q", t1 + ":theupdatestuff");
- res = clients.get(0).query(params);
- assertEquals(1, res.getResults().getNumFound());
- }
-
- private QueryResponse query(SolrClient client) throws SolrServerException, IOException {
- SolrQuery query = new SolrQuery("*:*");
- return client.query(query);
- }
-
- protected SolrInputDocument addRandFields(SolrInputDocument sdoc) {
- return sdoc;
+ /**
+ * Inspects the cluster to determine all active shards/replicas for the default collection then,
+ * executes a <code>distrib=false</code> query using the specified params, and compares the resulting
+ * {@link SolrDocumentList}, failing if any replica does not agree with it's leader.
+ *
+ * @see #cluster
+ * @see CloudInspectUtil#showDiff
+ */
+ private void checkShardConsistency(final SolrParams params) throws Exception {
+ // TODO: refactor into static in CloudInspectUtil w/ DocCollection param?
+ // TODO: refactor to take in a BiFunction<QueryResponse,QueryResponse,Boolean> ?
+
+ final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"),
+ params);
+ final DocCollection collection = cluster.getSolrClient().getZkStateReader()
+ .getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection());
+ log.info("Checking shard consistency via: {}", perReplicaParams);
+ for (Map.Entry<String,Slice> entry : collection.getActiveSlicesMap().entrySet()) {
+ final String shardName = entry.getKey();
+ final Slice slice = entry.getValue();
+ log.info("Checking: {} -> {}", shardName, slice);
+ final Replica leader = entry.getValue().getLeader();
+ try (HttpSolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) {
+ final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
+ log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
+ for (Replica replica : slice) {
+ try (HttpSolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) {
+ final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults();
+ log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults);
+ assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(),
+ Collections.emptySet(),
+ CloudInspectUtil.showDiff(leaderResults, replicaResults,
+ shardName + " leader: " + leader.getCoreUrl(),
+ shardName + ": " + replica.getCoreUrl()));
+ }
+ }
+ }
+ }
}
}