You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/11/22 16:35:55 UTC
[lucene-solr] branch jira/SOLR-13101 updated: SOLR-13813: add test
for shared storage live split (#1003)
This is an automated email from the ASF dual-hosted git repository.
yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/SOLR-13101 by this push:
new d403b4a SOLR-13813: add test for shared storage live split (#1003)
d403b4a is described below
commit d403b4a1261b31f2bde4cbdd30935e5f0042f8ba
Author: Yonik Seeley <yo...@apache.org>
AuthorDate: Fri Nov 22 11:35:45 2019 -0500
SOLR-13813: add test for shared storage live split (#1003)
---
.../solr/store/blob/SharedStorageSplitTest.java | 170 +++++++++++++++++++--
1 file changed, 155 insertions(+), 15 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java b/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
index 9a58ebd..8175f4f7 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
@@ -16,14 +16,18 @@
*/
package org.apache.solr.store.blob;
+import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
@@ -32,6 +36,8 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -47,11 +53,14 @@ import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Tests for shard splitting in conjunction with shared storage
+ * Tests for shard splitting in conjunction with sharedf storage
*/
public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>();
@@ -77,8 +86,7 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
shutdownCluster();
}
- void doSplitShard(String collectionName, boolean sharedStorage, int repFactor, int nPrefixes, int nDocsPerPrefix) throws Exception {
-
+ CloudSolrClient createCollection(String collectionName, boolean sharedStorage, int repFactor) throws Exception {
if (sharedStorage) {
CollectionAdminRequest
.createCollection(collectionName, "conf", 1, 0, 0, 0)
@@ -97,29 +105,53 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
CloudSolrClient client = cluster.getSolrClient();
client.setDefaultCollection(collectionName);
+ return client;
+ }
+ private void indexPrefixDocs(CloudSolrClient client, String collectionName, int nPrefixes, int nDocsPerPrefix, int docOffset) throws Exception {
if (random().nextBoolean()) {
+ // index docs separately
for (int i = 0; i < nPrefixes; i++) {
String prefix = "a" + i;
for (int j = 0; j < nDocsPerPrefix; j++) {
- client.add(sdoc("id", prefix + "!doc" + j));
+ client.add(sdoc("id", prefix + "!doc" + (j+docOffset)));
}
}
- client.commit(collectionName, true, true, false);
+ if (random().nextBoolean()) {
+ client.commit(collectionName, true, true, false);
+ }
} else {
// Try all docs in the same update request
UpdateRequest updateReq = new UpdateRequest();
for (int i = 0; i < nPrefixes; i++) {
String prefix = "a" + i;
for (int j = 0; j < nDocsPerPrefix; j++) {
- updateReq.add(sdoc("id", prefix + "!doc" + j));
+ updateReq.add(sdoc("id", prefix + "!doc" + (j+docOffset)));
}
}
- UpdateResponse ursp = updateReq.commit(client, collectionName);
+ UpdateResponse ursp;
+ if (random().nextBoolean()) {
+ ursp = updateReq.commit(client, collectionName);
+ } else {
+ ursp = updateReq.process(client, collectionName);
+ }
assertEquals(0, ursp.getStatus());
}
+ }
+
+ void doSplitShard(String collectionName, boolean sharedStorage, int repFactor, int nPrefixes, int nDocsPerPrefix) throws Exception {
+ CloudSolrClient client = createCollection(collectionName, sharedStorage, repFactor);
+
+ /*** TODO: this currently causes a failure due to a NPE. Uncomment when fixed.
+ if (random().nextBoolean()) {
+ // start off with a commit
+ client.commit(collectionName, true, true, false);
+ }
+ ***/
+
+ indexPrefixDocs(client, collectionName, nPrefixes, nDocsPerPrefix, 0);
- checkExpectedDocs(client, repFactor, nPrefixes * nDocsPerPrefix);
+ assertEquals(nPrefixes * nDocsPerPrefix, getNumDocs(client,sharedStorage,repFactor));
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
.setSplitByPrefix(true)
@@ -128,15 +160,17 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
waitForState("Timed out waiting for sub shards to be active.",
collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
- checkExpectedDocs(client, repFactor, nPrefixes * nDocsPerPrefix);
+ // now index another batch of docs into the new shards
+ indexPrefixDocs(client, collectionName, nPrefixes, nDocsPerPrefix, nDocsPerPrefix);
+ assertEquals(nPrefixes * nDocsPerPrefix * 2, getNumDocs(client,sharedStorage,repFactor));
}
- void checkExpectedDocs(CloudSolrClient client, int repFactor, long numExpected) throws Exception {
+ long getNumDocs(CloudSolrClient client, boolean sharedStorage, int repFactor) throws Exception {
String collectionName = client.getDefaultCollection();
DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
Collection<Slice> slices = collection.getSlices();
- if (repFactor > 1) {
+ if (sharedStorage && repFactor > 1) {
// set up count down latches to wait for pulls to complete
List<CountDownLatch> latches = new LinkedList<>();
@@ -147,7 +181,7 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
// ensure we count down for all replicas per slice.
String sharedShardName = (String) slice.getProperties().get(ZkStateReader.SHARED_SHARD_NAME);
solrProcessesTaskTracker.get(replica.getNodeName())
- .put(replica.getCoreName(), cdl);
+ .put(replica.getCoreName(), cdl);
SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
try {
replicaClient.query(params("q", "*:* priming pull", "distrib", "false"));
@@ -183,10 +217,9 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
totCount += lastReplicaCount;
}
- assertEquals(numExpected, totCount);
-
long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
- assertEquals(numExpected, cloudClientDocs);
+ assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
+ return totCount;
}
@Test
@@ -195,6 +228,113 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase {
doSplitShard("c2", true, 2, 2, 2);
}
+ void doLiveSplitShard(String collectionName, boolean sharedStorage, int repFactor, int nThreads) throws Exception {
+ final boolean doSplit = true; // test debugging aid: set to false if you want to check that the test passes if we don't do a split
+ final boolean updateFailureOK = true; // TODO: this should be changed to false after the NPE bug is fixed
+ final CloudSolrClient client = createCollection(collectionName, sharedStorage, repFactor);
+
+ final ConcurrentHashMap<String,Long> model = new ConcurrentHashMap<>(); // what the index should contain
+ final AtomicBoolean doIndex = new AtomicBoolean(true);
+ final AtomicInteger docsIndexed = new AtomicInteger();
+ final AtomicInteger failures = new AtomicInteger();
+ Thread[] indexThreads = new Thread[nThreads];
+ try {
+
+ for (int i=0; i<nThreads; i++) {
+ indexThreads[i] = new Thread(() -> {
+ while (doIndex.get()) {
+ try {
+ // Thread.sleep(10); // cap indexing rate at 100 docs per second per thread
+ int currDoc = docsIndexed.incrementAndGet();
+ String docId = "doc_" + currDoc;
+
+ // Try all docs in the same update request
+ UpdateRequest updateReq = new UpdateRequest();
+ updateReq.add(sdoc("id", docId));
+ UpdateResponse ursp = updateReq.commit(client, collectionName); // uncomment this if you want a commit each time
+ // UpdateResponse ursp = updateReq.process(client, collectionName);
+
+ if (ursp.getStatus() == 0) {
+ model.put(docId, 1L); // in the future, keep track of a version per document and reuse ids to keep index from growing too large
+ } else {
+ failures.incrementAndGet();
+ if (!updateFailureOK) {
+ assertEquals(0, ursp.getStatus());
+ }
+ }
+ } catch (Exception e) {
+ if (!updateFailureOK) {
+ fail(e.getMessage());
+ break;
+ }
+ failures.incrementAndGet();
+ }
+ }
+ });
+ }
+
+ for (Thread thread : indexThreads) {
+ thread.start();
+ }
+
+ Thread.sleep(100); // wait for a few docs to be indexed before invoking split
+ int docCount = model.size();
+
+ if (doSplit) {
+ CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
+ .setShardName("shard1");
+ splitShard.process(client);
+ waitForState("Timed out waiting for sub shards to be active.",
+ collectionName, activeClusterShape(2, 3 * repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
+ } else {
+ Thread.sleep(10 * 1000);
+ }
+
+ // make sure that docs were able to be indexed during the split
+ assertTrue(model.size() > docCount);
+
+ Thread.sleep(100); // wait for a few more docs to be indexed after split
+
+ } finally {
+ // shut down the indexers
+ doIndex.set(false);
+ for (Thread thread : indexThreads) {
+ thread.join();
+ }
+ }
+
+ client.commit(); // final commit is needed for visibility
+
+ long numDocs = getNumDocs(client, true, repFactor);
+ if (numDocs != model.size()) {
+ SolrDocumentList results = client.query(new SolrQuery("q","*:*", "fl","id", "rows", Integer.toString(model.size()) )).getResults();
+ Map<String,Long> leftover = new HashMap<>(model);
+ for (SolrDocument doc : results) {
+ String id = (String) doc.get("id");
+ leftover.remove(id);
+ }
+ log.error("MISSING DOCUMENTS: " + leftover);
+ }
+
+ assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
+ log.info("Number of documents indexed and queried : " + numDocs + " failures during splitting=" + failures.get());
+ }
+
+
+ // TODO: this test is adapted from SplitShardTest.testLiveSplit and could perhaps
+ // be unified.
+ @Test
+ public void testLiveSplit() throws Exception {
+ // Debugging tips: if this fails, it may be easier to debug by lowering the number fo threads to 1 and looping the test
+ // until you get another failure.
+ // You may need to further instrument things like DistributedZkUpdateProcessor to display the cluster state for the collection, etc.
+ // Using more threads increases the chance to hit a concurrency bug, but too many threads can overwhelm single-threaded buffering
+ // replay after the low level index split and result in subShard leaders that can't catch up and
+ // become active (a known issue that still needs to be resolved.)
+ doLiveSplitShard("livesplit1", true, 1, 8);
+ }
+
+
private static Map<String, CountDownLatch> configureTestBlobProcessForNode(JettySolrRunner runner) {
Map<String, CountDownLatch> asyncPullTracker = new HashMap<>();