You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/30 19:32:12 UTC
[lucene-solr] 01/02: @473 Still pushing dist updates,
need to stop and think and make one last pass.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 88fab1006b16cd6d4ffe4d56bc66615251414a0a
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jul 30 13:11:12 2020 -0500
@473 Still pushing dist updates, need to stop and think and make one last pass.
---
.../org/apache/solr/update/SolrCmdDistributor.java | 5 +-
.../processor/DistributedZkUpdateProcessor.java | 2 +-
.../cloud/monster/LargeSolrCloudStressTest.java | 530 +++++++++++++++++++++
.../client/solrj/impl/AsyncLBHttpSolrClient.java | 9 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 83 ++--
.../solr/client/solrj/impl/LBSolrClient.java | 2 +-
.../org/apache/solr/common/ParWorkExecService.java | 3 -
.../solr/common/util/SolrQueuedThreadPool.java | 18 +-
.../src/java/org/apache/solr/SolrTestCase.java | 2 +-
9 files changed, 591 insertions(+), 63 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5adebab..8eef04d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -83,7 +83,8 @@ public class SolrCmdDistributor implements Closeable {
public void finish() {
assert !finished : "lifecycle sanity check";
- solrClient.waitForOutstandingRequests();
+
+ blockAndDoRetries();
finished = true;
}
@@ -194,7 +195,7 @@ public class SolrCmdDistributor implements Closeable {
Set<CountDownLatch> latches = new HashSet<>(nodes.size());
// we need to do any retries before commit...
- blockAndDoRetries();
+ // blockAndDoRetries();
if (log.isDebugEnabled()) log.debug("Distrib commit to: {} params: {}", nodes, params);
for (Node node : nodes) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 6c325b7..8672c00 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -1154,7 +1154,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
cmdDistrib.finish();
Set<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
if (errors.size() > 0) {
- log.info("There were errors during the request {}", errors);
+ log.warn("There were errors during the request {}", errors);
}
// TODO - we may need to tell about more than one error...
diff --git a/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java b/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java
new file mode 100644
index 0000000..5c929f4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.cloud.SocketProxy;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.SolrParams;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Super basic testing, no shard restarting or anything.
+ */
+@Slow
+
+public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ useFactory(null);
+ System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
+ System.setProperty("distribUpdateSoTimeout", "10000");
+ System.setProperty("socketTimeout", "15000");
+ System.setProperty("connTimeout", "5000");
+ System.setProperty("solr.test.socketTimeout.default", "15000");
+ System.setProperty("solr.connect_timeout.default", "5000");
+ System.setProperty("solr.so_commit_timeout.default", "15000");
+ System.setProperty("solr.httpclient.defaultConnectTimeout", "5000");
+ System.setProperty("solr.httpclient.defaultSoTimeout", "15000");
+
+ System.setProperty("solr.httpclient.retries", "0");
+ System.setProperty("solr.retries.on.forward", "0");
+ System.setProperty("solr.retries.to.followers", "0");
+
+ System.setProperty("solr.waitForState", "10"); // secs
+
+ System.setProperty("solr.default.collection_op_timeout", "15000");
+
+
+ // use a 5 node cluster so with a typical 2x2 collection one node isn't involved
+ // helps to randomly test edge cases of hitting a node not involved in collection
+ configureCluster(TEST_NIGHTLY ? 5 : 2).configure();
+ }
+
+ @After
+ public void purgeAllCollections() throws Exception {
+ cluster.getSolrClient().setDefaultCollection(null);
+ }
+
+
+ @AfterClass
+ public static void after() throws Exception {
+ zkClient().printLayout();
+ }
+
+ /**
+ * Creates a new 2x2 collection using a unique name, blocking until it's state is fully active,
+ * and sets that collection as the default on the cluster's default CloudSolrClient.
+ *
+ * @return the name of the new collection
+ */
+ public static String createAndSetNewDefaultCollection() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
+ CollectionAdminRequest.createCollection(name, "_default", 2, 2).setMaxShardsPerNode(10)
+ .process(cloudClient);
+ cloudClient.setDefaultCollection(name);
+ return name;
+ }
+
+ @Test
+ public void testBasicUpdates() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ // add a doc, update it, and delete it
+ addUpdateDelete(collectionName, "doc1");
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ // add 2 docs in a single request
+ addTwoDocsInOneRequest("doc2", "doc3");
+ assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ // 2 deletes in a single request...
+ assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3"))
+ .process(cloudClient).getStatus());
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ // add a doc that we will then delete later after adding two other docs (all before next commit).
+ assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus());
+ assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"),
+ sdoc("id", "doc6"))).getStatus());
+ assertEquals(0, cloudClient.deleteById("doc4").getStatus());
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+
+ assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound());
+ assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6"));
+
+ // delete everything....
+ assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+
+ }
+
+ @Nightly
+ public void testThatCantForwardToLeaderFails() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement();
+ cloudClient.setDefaultCollection(collectionName);
+
+ // get a random node for use in our collection before creating the one we'll partition..
+ final JettySolrRunner otherLeader = cluster.getRandomJetty(random());
+ // pick a (second) random node (which may be the same) for sending updates to
+ // (if it's the same, we're testing routing from another shard, if diff we're testing routing
+ // from a non-collection node)
+ final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName;
+
+ // create a new node for the purpose of killing it...
+ final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner();
+ try {
+ cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+
+ // HACK: we have to stop the node in order to enable the proxy, in order to then restart the node
+ // (in order to then "partition it" later via the proxy)
+ final SocketProxy proxy = new SocketProxy();
+ cluster.stopJettySolrRunner(leaderToPartition);
+ cluster.waitForJettyToStop(leaderToPartition);
+ leaderToPartition.setProxyPort(proxy.getListenPort());
+ cluster.startJettySolrRunner(leaderToPartition);
+ proxy.open(new URI(leaderToPartition.getBaseUrl()));
+ try {
+ log.info("leaderToPartition's Proxy: {}", proxy);
+
+ cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+ // create a 2x1 collection using a nodeSet that includes our leaderToPartition...
+ assertEquals(RequestStatusState.COMPLETED,
+ CollectionAdminRequest.createCollection(collectionName, 2, 1)
+ .setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName())
+ .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+
+ cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
+
+ { // HACK: Check the leaderProps for the shard hosted on the node we're going to kill...
+ final Replica leaderProps = cloudClient.getZkStateReader()
+ .getClusterState().getCollection(collectionName)
+ .getLeaderReplicas(leaderToPartition.getNodeName()).get(0);
+
+ // No point in this test if these aren't true...
+ assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(),
+ leaderProps.getStr(Slice.LEADER));
+ assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(),
+ leaderProps.getCoreUrl().contains(""+proxy.getListenPort()));
+ }
+
+ // create client to send our updates to...
+ try (Http2SolrClient indexClient = getHttpSolrClient(indexingUrl)) {
+
+ // Sanity check: we should be able to send a bunch of updates that work right now...
+ for (int i = 0; i < 100; i++) {
+ final UpdateResponse rsp = indexClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+ assertEquals(0, rsp.getStatus());
+ }
+
+ log.info("Closing leaderToPartition's proxy: {}", proxy);
+ proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure
+
+ final SolrException e = expectThrows(SolrException.class, () -> {
+ // start at 50 so that we have some "updates" to previous docs and some "adds"...
+ for (int i = 50; i < 250; i++) {
+ // Pure random odds of all of these docs belonging to the live shard are 1 in 2**200...
+ // Except we know the hashing algorithm isn't purely random,
+ // So the actual odds are "0" unless the hashing algorithm is changed to suck badly...
+ final UpdateResponse rsp = indexClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+ // if the update didn't throw an exception, it better be a success..
+ assertEquals(0, rsp.getStatus());
+ }
+ });
+ assertEquals(500, e.code());
+ }
+ } finally {
+ proxy.close(); // don't leak this port
+ }
+ } finally {
+ cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests
+ cluster.waitForJettyToStop(leaderToPartition);
+ }
+ }
+
+ /** NOTE: uses the cluster's CloudSolrClient and assumes default collection has been set */
+ private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+
+ assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA),
+ sdoc("id", docIdB))).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
+
+ assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")")
+ ).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs"));
+ }
+
+ /** NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+ private void addUpdateDelete(String collection, String docId) throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+
+ // add the doc, confirm we can query it...
+ assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
+
+ assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+ assertEquals(1,
+ cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId))
+ .getResults().getNumFound());
+
+ checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc"));
+
+ // update doc
+ assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus());
+ assertEquals(0, cloudClient.commit().getStatus());
+
+ // confirm we can query the doc by updated content and not original...
+ assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+ assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+ assertEquals(1,
+ cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId))
+ .getResults().getNumFound());
+
+ // delete the doc, confim it no longer matches in queries...
+ assertEquals(0, cloudClient.deleteById(docId).getStatus());
+ assertEquals(0, cloudClient.commit(collection).getStatus());
+
+ assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+ assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc"));
+
+ }
+
+ @Ignore // nocommit debug
+ public long testIndexQueryDeleteHierarchical() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ // index
+ long docId = 42;
+ int topDocsNum = atLeast(TEST_NIGHTLY ? 5 : 2);
+ int childsNum = (TEST_NIGHTLY ? 5 : 2)+random().nextInt(TEST_NIGHTLY ? 5 : 2);
+ for (int i = 0; i < topDocsNum; ++i) {
+ UpdateRequest uReq = new UpdateRequest();
+ SolrInputDocument topDocument = new SolrInputDocument();
+ topDocument.addField("id", docId++);
+ topDocument.addField("type_s", "parent");
+ topDocument.addField(i + "parent_f1_s", "v1");
+ topDocument.addField(i + "parent_f2_s", "v2");
+
+
+ for (int index = 0; index < childsNum; ++index) {
+ docId = addChildren("child", topDocument, index, false, docId);
+ }
+
+ uReq.add(topDocument);
+ assertEquals(i + "/" + docId,
+ 0, uReq.process(cloudClient).getStatus());
+ }
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids"));
+
+ // query
+
+ // parents
+ assertEquals(topDocsNum,
+ cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound());
+
+ // childs
+ assertEquals(topDocsNum * childsNum,
+ cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound());
+
+
+ // grandchilds
+ //
+ //each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands
+ //x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1
+ //x = 2 * ((t-1) * t / 2) = t * (t - 1)
+ assertEquals(topDocsNum * childsNum * (childsNum - 1),
+ cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound());
+
+ //delete
+ assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+ assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+
+ return docId;
+ }
+
+
+ /**
+ * Recursive helper function for building out child and grandchild docs
+ */
+ private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) {
+ SolrInputDocument childDocument = new SolrInputDocument();
+ childDocument.addField("id", docId++);
+ childDocument.addField("type_s", prefix);
+ for (int index = 0; index < childIndex; ++index) {
+ childDocument.addField(childIndex + prefix + index + "_s", childIndex + "value"+ index);
+ }
+
+ if (!lastLevel) {
+ for (int i = 0; i < childIndex * 2; ++i) {
+ docId = addChildren("grand", childDocument, i, true, docId);
+ }
+ }
+ topDocument.addChildDocument(childDocument);
+ return docId;
+ }
+
+ @Ignore // nocommit debug
+ public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocs = atLeast(TEST_NIGHTLY ? 50 : 15);
+ for (int i = 0; i < numDocs; i++) {
+ UpdateRequest uReq;
+ uReq = new UpdateRequest();
+ assertEquals(0, cloudClient.add
+ (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus());
+ }
+ assertEquals(0, cloudClient.commit(collectionName).getStatus());
+ assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+ }
+
+ // @Ignore // nocommit debug
+ public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocsPerBatch = atLeast(5);
+ final int numBatchesPerThread = atLeast(5);
+ AtomicInteger expectedDocCount = new AtomicInteger();
+
+ final CountDownLatch abort = new CountDownLatch(1);
+ class BatchIndexer implements Runnable {
+ private boolean keepGoing() {
+ return 0 < abort.getCount();
+ }
+
+ final int name;
+ public BatchIndexer(int name) {
+ this.name = name;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int batchId = 0; batchId < numBatchesPerThread && keepGoing(); batchId++) {
+ final UpdateRequest req = new UpdateRequest();
+ for (int docId = 0; docId < numDocsPerBatch && keepGoing(); docId++) {
+ expectedDocCount.incrementAndGet();
+ req.add(sdoc("id", "indexer" + name + "_" + batchId + "_" + docId,
+ "test_t", TestUtil.randomRealisticUnicodeString(LuceneTestCase.random(), 200)));
+ }
+ assertEquals(0, req.process(cloudClient).getStatus());
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ abort.countDown();
+ }
+ }
+ };
+
+ final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
+ final List<Future<?>> futures = new ArrayList<>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(testExecutor.submit(new BatchIndexer(i)));
+ }
+ final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch;
+
+
+ for (Future result : futures) {
+ result.get();
+ assertFalse(result.isCancelled());
+ assertTrue(result.isDone());
+ // all we care about is propogating any possibile execution exception...
+ final Object ignored = result.get();
+ }
+
+ cloudClient.commit(collectionName);
+ assertEquals(expectedDocCount.get(), cloudClient.query(params("q","*:*")).getResults().getNumFound());
+ checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
+ }
+
+
+ public void testConcurrentIndexing() throws Exception {
+ final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+ final String collectionName = createAndSetNewDefaultCollection();
+
+ final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
+ final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
+ try (ConcurrentUpdateSolrClient indexClient
+ = getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
+
+ for (int i = 0; i < numDocs; i++) {
+ log.info("add doc {}", i);
+ indexClient.add(sdoc("id", i, "text_t",
+ TestUtil.randomRealisticUnicodeString(random(), 200)));
+ }
+ indexClient.blockUntilFinished();
+ assertEquals(0, indexClient.commit().getStatus());
+ indexClient.blockUntilFinished();
+ }
+ assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+ }
+
+ /**
+ * Inspects the cluster to determine all active shards/replicas for the default collection then,
+ * executes a <code>distrib=false</code> query using the specified params, and compares the resulting
+ * {@link SolrDocumentList}, failing if any replica does not agree with it's leader.
+ *
+ * @see #cluster
+ * @see CloudInspectUtil#showDiff
+ */
+ private void checkShardConsistency(final SolrParams params) throws Exception {
+ // TODO: refactor into static in CloudInspectUtil w/ DocCollection param?
+ // TODO: refactor to take in a BiFunction<QueryResponse,QueryResponse,Boolean> ?
+
+ final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"),
+ params);
+ final DocCollection collection = cluster.getSolrClient().getZkStateReader()
+ .getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection());
+ log.info("Checking shard consistency via: {}", perReplicaParams);
+ for (Map.Entry<String,Slice> entry : collection.getActiveSlicesMap().entrySet()) {
+ final String shardName = entry.getKey();
+ final Slice slice = entry.getValue();
+ log.info("Checking: {} -> {}", shardName, slice);
+ final Replica leader = entry.getValue().getLeader();
+ try (Http2SolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) {
+ final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
+ log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
+ for (Replica replica : slice) {
+ try (Http2SolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) {
+ final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults();
+ if (log.isDebugEnabled()) {
+ log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults);
+ }
+ assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(),
+ Collections.emptySet(),
+ CloudInspectUtil.showDiff(leaderResults, replicaResults,
+ shardName + " leader: " + leader.getCoreUrl(),
+ shardName + ": " + replica.getCoreUrl()));
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
index 3cbc2f6..bd0e90c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.impl;
import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.lang.ref.WeakReference;
import java.net.ConnectException;
import java.net.MalformedURLException;
@@ -56,6 +57,8 @@ import org.apache.solr.common.params.QoSParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
@@ -96,6 +99,10 @@ import org.slf4j.MDC;
* @since solr 1.4
*/
public class AsyncLBHttpSolrClient extends SolrClient {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(MethodHandles.lookup().lookupClass());
+
private static Set<Integer> RETRY_CODES = new HashSet<>(4);
static {
@@ -483,7 +490,7 @@ public class AsyncLBHttpSolrClient extends SolrClient {
}
protected Exception addZombie(Http2SolrClient server, String url, Exception e) {
-
+ log.warn("adding zombie server {} due to exception", url, e);
ServerWrapper wrapper;
wrapper = new ServerWrapper(server, url);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index b1ef811..f23a4a2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -102,7 +102,6 @@ import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +137,7 @@ public class Http2SolrClient extends SolrClient {
private static final String DEFAULT_PATH = "/select";
private static final List<String> errPath = Arrays.asList("metadata", "error-class");
private final Map<String, String> headers;
- private final SolrHttpClientScheduler scheduler;
+
private final CloseTracker closeTracker;
private volatile HttpClient httpClient;
@@ -168,8 +167,6 @@ public class Http2SolrClient extends SolrClient {
this.serverBaseUrl = serverBaseUrl;
}
- scheduler = new SolrHttpClientScheduler("JettyHttpClientScheduler", true, null, new ThreadGroup("JettyHttpClientScheduler"), 5);
-
this.headers = builder.headers;
if (builder.idleTimeout != null && builder.idleTimeout > 0) idleTimeout = builder.idleTimeout;
@@ -205,12 +202,6 @@ public class Http2SolrClient extends SolrClient {
private HttpClient createHttpClient(Builder builder) {
HttpClient httpClient;
-// BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
-// httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(Integer.getInteger("solr.http2solrclient.corepool.size", 2),
-// Integer.getInteger("solr.http2solrclient.maxpool.size", 10), Integer.getInteger("solr.http2solrclient.pool.keepalive", 3000),
-// TimeUnit.MILLISECONDS, queue, new SolrNamedThreadFactory("h2sc"));
-
-
SslContextFactory.Client sslContextFactory;
boolean ssl;
if (builder.sslConfig == null) {
@@ -237,11 +228,11 @@ public class Http2SolrClient extends SolrClient {
HTTP2Client http2client = new HTTP2Client();
transport = new HttpClientTransportOverHTTP2(http2client);
httpClient = new HttpClient(transport, sslContextFactory);
- httpClient.setMaxConnectionsPerDestination(10);
+ httpClient.setMaxConnectionsPerDestination(300);
}
httpClientExecutor = new SolrQueuedThreadPool("httpClient");
- httpClientExecutor.setMaxThreads(Math.max(4 , Runtime.getRuntime().availableProcessors()));
- httpClientExecutor.setMinThreads(3);
+ //httpClientExecutor.setMaxThreads(-1);
+ // httpClientExecutor.setMinThreads(3);
httpClient.setIdleTimeout(idleTimeout);
try {
httpClient.setExecutor(httpClientExecutor);
@@ -252,7 +243,6 @@ public class Http2SolrClient extends SolrClient {
httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
httpClient.setIdleTimeout(idleTimeout);
if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
- httpClient.setScheduler(scheduler);
httpClient.start();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
@@ -276,17 +266,11 @@ public class Http2SolrClient extends SolrClient {
}
});
}
- closer.collect(() -> {
- // we wait for async requests, so far devs don't want to give sugar for this
- asyncTracker.waitForCompleteFinal();
- if (httpClientExecutor != null) {
- try {
- httpClientExecutor.waitForStopping();
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- }
- }
- });
+// closer.collect(() -> {
+// // we wait for async requests, so far devs don't want to give sugar for this
+// // asyncTracker.waitForCompleteFinal();
+//
+// });
closer.addCollect("httpClientExecutor");
}
assert ObjectReleaseTracker.release(this);
@@ -435,8 +419,8 @@ public class Http2SolrClient extends SolrClient {
? this.parser: solrRequest.getResponseParser();
if (onComplete != null) {
// This async call only suitable for indexing since the response size is limited by 5MB
- asyncTracker.register();
- req.send(new BufferingResponseListener(5 * 1024 * 1024) {
+ req.onRequestQueued(asyncTracker.queuedListener)
+ .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
@Override
public void onComplete(Result result) {
@@ -458,7 +442,7 @@ public class Http2SolrClient extends SolrClient {
onComplete.onFailure(e);
}
} finally {
- asyncTracker.completeListener.onComplete(result);
+ // asyncTracker.completeListener.onComplete(result);
}
}
});
@@ -475,8 +459,7 @@ public class Http2SolrClient extends SolrClient {
}
}
};
- asyncTracker.register();
- req.send(listener);
+ req.onRequestQueued(asyncTracker.queuedListener).send(listener);
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
InputStream is = listener.getInputStream();
assert ObjectReleaseTracker.track(is);
@@ -886,7 +869,7 @@ public class Http2SolrClient extends SolrClient {
private class AsyncTracker {
// nocommit - look at outstanding max again
- private static final int MAX_OUTSTANDING_REQUESTS = 100;
+ private static final int MAX_OUTSTANDING_REQUESTS = 10;
private final Semaphore available;
@@ -898,15 +881,21 @@ public class Http2SolrClient extends SolrClient {
}
};
// maximum outstanding requests left
- // private final Request.QueuedListener queuedListener;
+ private final Request.QueuedListener queuedListener;
private final Response.CompleteListener completeListener;
AsyncTracker() {
-// queuedListener = request -> {
-// phaser.register();
-// if (log.isDebugEnabled()) log.debug("Request queued registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-// };
- available = new Semaphore(MAX_OUTSTANDING_REQUESTS, true);
+ available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
+ queuedListener = request -> {
+ phaser.register();
+ try {
+ available.acquire();
+ } catch (InterruptedException ignored) {
+ ParWork.propegateInterrupt(ignored);
+ }
+ if (log.isDebugEnabled()) log.debug("Request queued registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
+ };
+
completeListener = result -> {
if (log.isDebugEnabled()) log.debug("Request complete registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
phaser.arriveAndDeregister();
@@ -921,15 +910,19 @@ public class Http2SolrClient extends SolrClient {
public synchronized void waitForComplete() {
if (log.isDebugEnabled()) log.debug("Before wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-
+ if (phaser.getUnarrivedParties() == 0) {
+ return;
+ }
int arrival = phaser.arriveAndAwaitAdvance();
+ // phaser.awaitAdvance(phaser.arriveAndDeregister());
+
if (log.isDebugEnabled()) log.debug("After wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
}
public void waitForCompleteFinal() {
if (log.isDebugEnabled()) log.debug("Before wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
- int arrival = phaser.arriveAndAwaitAdvance();
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
if (log.isDebugEnabled()) log.debug("After wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
}
@@ -938,12 +931,12 @@ public class Http2SolrClient extends SolrClient {
if (log.isDebugEnabled()) {
log.debug("Registered new party");
}
- phaser.register();
- try {
- available.acquire();
- } catch (InterruptedException ignored) {
- ParWork.propegateInterrupt(ignored);
- }
+ // phaser.register();
+// try {
+// available.acquire();
+// } catch (InterruptedException ignored) {
+// ParWork.propegateInterrupt(ignored);
+// }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index 163e755..0e09d43 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -428,7 +428,7 @@ public abstract class LBSolrClient extends SolrClient {
protected abstract SolrClient getClient(String baseUrl);
private Exception addZombie(String serverStr, Exception e) {
- log.info("add Zombie {}" + serverStr);
+ log.warn("adding zombie server {} due to exception", serverStr, e);
ServerWrapper wrapper = createServerWrapper(serverStr);
wrapper.standard = false;
zombieServers.put(serverStr, wrapper);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index a8b285f..8a29c28 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -289,9 +289,6 @@ public class ParWorkExecService implements ExecutorService {
@Override
public void execute(Runnable runnable) {
- if (shutdown || terminated) {
- throw new RejectedExecutionException();
- }
boolean success = checkLoad();
if (success) {
success = available.tryAcquire();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index d9f30a2..aa6c493 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -142,7 +142,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
- setStopTimeout(5000);
+// / setStopTimeout(5000);
setReservedThreads(reservedThreads);
if (queue == null)
{
@@ -284,14 +284,14 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
private void joinThreads(long stopByNanos) throws InterruptedException
{
- for (Thread thread : _threads)
- {
- long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
- if (LOG.isDebugEnabled())
- LOG.debug("Waiting for {} for {}", thread, canWait);
- if (canWait > 0)
- thread.join(canWait);
- }
+// for (Thread thread : _threads)
+// {
+// long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
+// if (LOG.isDebugEnabled())
+// LOG.debug("Waiting for {} for {}", thread, canWait);
+// if (canWait > 0)
+// thread.join(canWait);
+// }
}
/**
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index c90e7f2..af5db31 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -448,7 +448,7 @@ public class SolrTestCase extends LuceneTestCase {
if (clazz != null) {
// nocommit - leave this on
- fail("A " + clazz.getName() + " took too long to close: " + tooLongTime + "\n" + times);
+ if (!TEST_NIGHTLY) fail("A " + clazz.getName() + " took too long to close: " + tooLongTime + "\n" + times);
}
}
log.info("@AfterClass end ------------------------------------------------------");