You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/30 19:32:12 UTC

[lucene-solr] 01/02: @473 Still pushing dist updates, need to stop and think and make one last pass.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 88fab1006b16cd6d4ffe4d56bc66615251414a0a
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jul 30 13:11:12 2020 -0500

    @473 Still pushing dist updates, need to stop and think and make one last pass.
---
 .../org/apache/solr/update/SolrCmdDistributor.java |   5 +-
 .../processor/DistributedZkUpdateProcessor.java    |   2 +-
 .../cloud/monster/LargeSolrCloudStressTest.java    | 530 +++++++++++++++++++++
 .../client/solrj/impl/AsyncLBHttpSolrClient.java   |   9 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |  83 ++--
 .../solr/client/solrj/impl/LBSolrClient.java       |   2 +-
 .../org/apache/solr/common/ParWorkExecService.java |   3 -
 .../solr/common/util/SolrQueuedThreadPool.java     |  18 +-
 .../src/java/org/apache/solr/SolrTestCase.java     |   2 +-
 9 files changed, 591 insertions(+), 63 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5adebab..8eef04d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -83,7 +83,8 @@ public class SolrCmdDistributor implements Closeable {
 
   public void finish() {
     assert !finished : "lifecycle sanity check";
-    solrClient.waitForOutstandingRequests();
+
+    blockAndDoRetries();
     finished = true;
   }
   
@@ -194,7 +195,7 @@ public class SolrCmdDistributor implements Closeable {
     Set<CountDownLatch> latches = new HashSet<>(nodes.size());
 
     // we need to do any retries before commit...
-    blockAndDoRetries();
+  //  blockAndDoRetries();
     if (log.isDebugEnabled()) log.debug("Distrib commit to: {} params: {}", nodes, params);
 
     for (Node node : nodes) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 6c325b7..8672c00 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -1154,7 +1154,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     cmdDistrib.finish();
     Set<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
     if (errors.size() > 0) {
-      log.info("There were errors during the request {}", errors);
+      log.warn("There were errors during the request {}", errors);
     }
 
     // TODO - we may need to tell about more than one error...
diff --git a/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java b/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java
new file mode 100644
index 0000000..5c929f4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/monster/LargeSolrCloudStressTest.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.cloud.SocketProxy;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.SolrParams;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Super basic testing, no shard restarting or anything.
+ */
+@Slow
+
+public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    useFactory(null);
+    System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
+    System.setProperty("distribUpdateSoTimeout", "10000");
+    System.setProperty("socketTimeout", "15000");
+    System.setProperty("connTimeout", "5000");
+    System.setProperty("solr.test.socketTimeout.default", "15000");
+    System.setProperty("solr.connect_timeout.default", "5000");
+    System.setProperty("solr.so_commit_timeout.default", "15000");
+    System.setProperty("solr.httpclient.defaultConnectTimeout", "5000");
+    System.setProperty("solr.httpclient.defaultSoTimeout", "15000");
+
+    System.setProperty("solr.httpclient.retries", "0");
+    System.setProperty("solr.retries.on.forward", "0");
+    System.setProperty("solr.retries.to.followers", "0");
+
+    System.setProperty("solr.waitForState", "10"); // secs
+
+    System.setProperty("solr.default.collection_op_timeout", "15000");
+
+
+    // use a 5 node cluster so with a typical 2x2 collection one node isn't involved
+    // helps to randomly test edge cases of hitting a node not involved in collection
+    configureCluster(TEST_NIGHTLY ? 5 : 2).configure();
+  }
+
+  @After
+  public void purgeAllCollections() throws Exception {
+    cluster.getSolrClient().setDefaultCollection(null);
+  }
+
+
+  @AfterClass
+  public static void after() throws Exception {
+    zkClient().printLayout();
+  }
+
+  /**
+   * Creates a new 2x2 collection using a unique name, blocking until it's state is fully active, 
+   * and sets that collection as the default on the cluster's default CloudSolrClient.
+   * 
+   * @return the name of the new collection
+   */
+  public static String createAndSetNewDefaultCollection() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
+    CollectionAdminRequest.createCollection(name, "_default", 2, 2).setMaxShardsPerNode(10)
+                 .process(cloudClient);
+    cloudClient.setDefaultCollection(name);
+    return name;
+  }
+  
+  @Test
+  public void testBasicUpdates() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+    
+    // add a doc, update it, and delete it
+    addUpdateDelete(collectionName, "doc1");
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    
+    // add 2 docs in a single request
+    addTwoDocsInOneRequest("doc2", "doc3");
+    assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    // 2 deletes in a single request...
+    assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3"))
+                 .process(cloudClient).getStatus());
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+    
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    
+    // add a doc that we will then delete later after adding two other docs (all before next commit).
+    assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus());
+    assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"),
+                                          sdoc("id", "doc6"))).getStatus());
+    assertEquals(0, cloudClient.deleteById("doc4").getStatus());
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+
+    assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound());
+    assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6"));
+
+    // delete everything....
+    assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+    
+  }
+
+  @Nightly
+  public void testThatCantForwardToLeaderFails() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement();
+    cloudClient.setDefaultCollection(collectionName);
+    
+    // get a random node for use in our collection before creating the one we'll partition..
+    final JettySolrRunner otherLeader = cluster.getRandomJetty(random());
+    // pick a (second) random node (which may be the same) for sending updates to
+    // (if it's the same, we're testing routing from another shard, if diff we're testing routing
+    // from a non-collection node)
+    final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName;
+
+    // create a new node for the purpose of killing it...
+    final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner();
+    try {
+      cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+
+      // HACK: we have to stop the node in order to enable the proxy, in order to then restart the node
+      // (in order to then "partition it" later via the proxy)
+      final SocketProxy proxy = new SocketProxy();
+      cluster.stopJettySolrRunner(leaderToPartition);
+      cluster.waitForJettyToStop(leaderToPartition);
+      leaderToPartition.setProxyPort(proxy.getListenPort());
+      cluster.startJettySolrRunner(leaderToPartition);
+      proxy.open(new URI(leaderToPartition.getBaseUrl()));
+      try {
+        log.info("leaderToPartition's Proxy: {}", proxy);
+        
+        cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+        // create a 2x1 collection using a nodeSet that includes our leaderToPartition...
+        assertEquals(RequestStatusState.COMPLETED,
+                     CollectionAdminRequest.createCollection(collectionName, 2, 1)
+                     .setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName())
+                     .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+
+        cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+                                 (n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
+
+        { // HACK: Check the leaderProps for the shard hosted on the node we're going to kill...
+          final Replica leaderProps = cloudClient.getZkStateReader()
+            .getClusterState().getCollection(collectionName)
+            .getLeaderReplicas(leaderToPartition.getNodeName()).get(0);
+          
+          // No point in this test if these aren't true...
+          assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(),
+                        leaderProps.getStr(Slice.LEADER));
+          assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(),
+                     leaderProps.getCoreUrl().contains(""+proxy.getListenPort()));
+        }
+        
+        // create client to send our updates to...
+        try (Http2SolrClient indexClient = getHttpSolrClient(indexingUrl)) {
+          
+          // Sanity check: we should be able to send a bunch of updates that work right now...
+          for (int i = 0; i < 100; i++) {
+            final UpdateResponse rsp = indexClient.add
+              (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+            assertEquals(0, rsp.getStatus());
+          }
+
+          log.info("Closing leaderToPartition's proxy: {}", proxy);
+          proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure
+          
+          final SolrException e = expectThrows(SolrException.class, () -> {
+              // start at 50 so that we have some "updates" to previous docs and some "adds"...
+              for (int i = 50; i < 250; i++) {
+                // Pure random odds of all of these docs belonging to the live shard are 1 in 2**200...
+                // Except we know the hashing algorithm isn't purely random,
+                // So the actual odds are "0" unless the hashing algorithm is changed to suck badly...
+                final UpdateResponse rsp = indexClient.add
+                (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+                // if the update didn't throw an exception, it better be a success..
+                assertEquals(0, rsp.getStatus());
+              }
+            });
+          assertEquals(500, e.code());
+        }
+      } finally {
+        proxy.close(); // don't leak this port
+      }
+    } finally {
+      cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests
+      cluster.waitForJettyToStop(leaderToPartition);
+    }
+  }
+  
+  /**  NOTE: uses the cluster's CloudSolrClient and assumes default collection has been set */
+  private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+
+    assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA),
+                                          sdoc("id", docIdB))).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
+    
+    assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")")
+                                      ).getResults().getNumFound());
+    
+    checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs"));
+  }
+
+  /**  NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+  private void addUpdateDelete(String collection, String docId) throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+
+    // add the doc, confirm we can query it...
+    assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
+    
+    assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+    assertEquals(1,
+                 cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId))
+                 .getResults().getNumFound());
+    
+    checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc"));
+    
+    // update doc
+    assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
+    
+    // confirm we can query the doc by updated content and not original...
+    assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+    assertEquals(1,
+                 cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId))
+                 .getResults().getNumFound());
+    
+    // delete the doc, confim it no longer matches in queries...
+    assertEquals(0, cloudClient.deleteById(docId).getStatus());
+    assertEquals(0, cloudClient.commit(collection).getStatus());
+    
+    assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+    assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+    
+    checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc"));
+
+  }
+
+  @Ignore // nocommit debug
+  public long testIndexQueryDeleteHierarchical() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+    
+    // index
+    long docId = 42;
+    int topDocsNum = atLeast(TEST_NIGHTLY ? 5 : 2);
+    int childsNum = (TEST_NIGHTLY ? 5 : 2)+random().nextInt(TEST_NIGHTLY ? 5 : 2);
+    for (int i = 0; i < topDocsNum; ++i) {
+      UpdateRequest uReq = new UpdateRequest();
+      SolrInputDocument topDocument = new SolrInputDocument();
+      topDocument.addField("id", docId++);
+      topDocument.addField("type_s", "parent");
+      topDocument.addField(i + "parent_f1_s", "v1");
+      topDocument.addField(i + "parent_f2_s", "v2");
+      
+      
+      for (int index = 0; index < childsNum; ++index) {
+        docId = addChildren("child", topDocument, index, false, docId);
+      }
+      
+      uReq.add(topDocument);
+      assertEquals(i + "/" + docId,
+                   0, uReq.process(cloudClient).getStatus());
+    }
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids"));
+    
+    // query
+    
+    // parents
+    assertEquals(topDocsNum,
+                 cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound());
+    
+    // childs 
+    assertEquals(topDocsNum * childsNum,
+                 cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound());
+                 
+    
+    // grandchilds
+    //
+    //each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands
+    //x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1 
+    //x = 2 * ((t-1) * t / 2) = t * (t - 1)
+    assertEquals(topDocsNum * childsNum * (childsNum - 1),
+                 cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound());
+    
+    //delete
+    assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+    
+    return docId;
+  }
+
+  
+  /**
+   * Recursive helper function for building out child and grandchild docs
+   */
+  private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) {
+    SolrInputDocument childDocument = new SolrInputDocument();
+    childDocument.addField("id", docId++);
+    childDocument.addField("type_s", prefix);
+    for (int index = 0; index < childIndex; ++index) {
+      childDocument.addField(childIndex + prefix + index + "_s", childIndex + "value"+ index);
+    }   
+  
+    if (!lastLevel) {
+      for (int i = 0; i < childIndex * 2; ++i) {
+        docId = addChildren("grand", childDocument, i, true, docId);
+      }
+    }
+    topDocument.addChildDocument(childDocument);
+    return docId;
+  }
+  
+  @Ignore // nocommit debug
+  public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+    
+    final int numDocs = atLeast(TEST_NIGHTLY ? 50 : 15);
+    for (int i = 0; i < numDocs; i++) {
+      UpdateRequest uReq;
+      uReq = new UpdateRequest();
+      assertEquals(0, cloudClient.add
+                   (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus());
+    }
+    assertEquals(0, cloudClient.commit(collectionName).getStatus());
+    assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    
+    checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+  }
+
+ // @Ignore // nocommit debug
+  public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+
+    final int numDocsPerBatch = atLeast(5);
+    final int numBatchesPerThread = atLeast(5);
+    AtomicInteger expectedDocCount = new AtomicInteger();
+      
+    final CountDownLatch abort = new CountDownLatch(1);
+    class BatchIndexer implements Runnable {
+      private boolean keepGoing() {
+        return 0 < abort.getCount();
+      }
+      
+      final int name;
+      public BatchIndexer(int name) {
+        this.name = name;
+      }
+      
+      @Override
+      public void run() {
+        try {
+          for (int batchId = 0; batchId < numBatchesPerThread && keepGoing(); batchId++) {
+            final UpdateRequest req = new UpdateRequest();
+            for (int docId = 0; docId < numDocsPerBatch && keepGoing(); docId++) {
+              expectedDocCount.incrementAndGet();
+              req.add(sdoc("id", "indexer" + name + "_" + batchId + "_" + docId,
+                           "test_t", TestUtil.randomRealisticUnicodeString(LuceneTestCase.random(), 200)));
+            }
+            assertEquals(0, req.process(cloudClient).getStatus());
+          }
+        } catch (Throwable e) {
+          e.printStackTrace();
+          abort.countDown();
+        }
+      }
+    };
+
+    final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
+    final List<Future<?>> futures = new ArrayList<>(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(testExecutor.submit(new BatchIndexer(i)));
+    }
+    final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch;
+
+
+    for (Future result : futures) {
+      result.get();
+      assertFalse(result.isCancelled());
+      assertTrue(result.isDone());
+      // all we care about is propogating any possibile execution exception...
+      final Object ignored = result.get();
+    }
+    
+    cloudClient.commit(collectionName);
+    assertEquals(expectedDocCount.get(), cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
+  }
+
+
+  public void testConcurrentIndexing() throws Exception {
+    final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+
+    final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
+    final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
+    try (ConcurrentUpdateSolrClient indexClient
+         = getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
+      
+      for (int i = 0; i < numDocs; i++) {
+        log.info("add doc {}", i);
+        indexClient.add(sdoc("id", i, "text_t",
+                             TestUtil.randomRealisticUnicodeString(random(), 200)));
+      }
+      indexClient.blockUntilFinished();
+      assertEquals(0, indexClient.commit().getStatus());
+      indexClient.blockUntilFinished();
+    }
+    assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+  }
+  
+  /**
+   * Inspects the cluster to determine all active shards/replicas for the default collection then,
+   * executes a <code>distrib=false</code> query using the specified params, and compares the resulting 
+   * {@link SolrDocumentList}, failing if any replica does not agree with it's leader.
+   *
+   * @see #cluster
+   * @see CloudInspectUtil#showDiff 
+   */
+  private void checkShardConsistency(final SolrParams params) throws Exception {
+    // TODO: refactor into static in CloudInspectUtil w/ DocCollection param?
+    // TODO: refactor to take in a BiFunction<QueryResponse,QueryResponse,Boolean> ?
+    
+    final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"),
+                                                                params);
+    final DocCollection collection = cluster.getSolrClient().getZkStateReader()
+      .getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection());
+    log.info("Checking shard consistency via: {}", perReplicaParams);
+    for (Map.Entry<String,Slice> entry : collection.getActiveSlicesMap().entrySet()) {
+      final String shardName = entry.getKey();
+      final Slice slice = entry.getValue();
+      log.info("Checking: {} -> {}", shardName, slice);
+      final Replica leader = entry.getValue().getLeader();
+      try (Http2SolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) {
+        final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
+        log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
+        for (Replica replica : slice) {
+          try (Http2SolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) {
+            final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults();
+            if (log.isDebugEnabled()) {
+              log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults);
+            }
+            assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(),
+                         Collections.emptySet(),
+                         CloudInspectUtil.showDiff(leaderResults, replicaResults,
+                                                   shardName + " leader: " + leader.getCoreUrl(),
+                                                   shardName + ": " + replica.getCoreUrl()));
+          }
+        }
+      }
+    }
+  }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
index 3cbc2f6..bd0e90c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/AsyncLBHttpSolrClient.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.impl;
 import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.lang.ref.WeakReference;
 import java.net.ConnectException;
 import java.net.MalformedURLException;
@@ -56,6 +57,8 @@ import org.apache.solr.common.params.QoSParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 /**
@@ -96,6 +99,10 @@ import org.slf4j.MDC;
  * @since solr 1.4
  */
 public class AsyncLBHttpSolrClient extends SolrClient {
+
+  private static final Logger log = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
+
   private static Set<Integer> RETRY_CODES = new HashSet<>(4);
 
   static {
@@ -483,7 +490,7 @@ public class AsyncLBHttpSolrClient extends SolrClient {
   }
 
   protected Exception addZombie(Http2SolrClient server, String url, Exception e) {
-
+    log.warn("adding zombie server {} due to exception", url, e);
     ServerWrapper wrapper;
 
     wrapper = new ServerWrapper(server, url);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index b1ef811..f23a4a2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -102,7 +102,6 @@ import org.eclipse.jetty.http2.client.HTTP2Client;
 import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
 import org.eclipse.jetty.util.Fields;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,7 +137,7 @@ public class Http2SolrClient extends SolrClient {
   private static final String DEFAULT_PATH = "/select";
   private static final List<String> errPath = Arrays.asList("metadata", "error-class");
   private final Map<String, String> headers;
-  private final SolrHttpClientScheduler scheduler;
+
   private final CloseTracker closeTracker;
 
   private volatile HttpClient httpClient;
@@ -168,8 +167,6 @@ public class Http2SolrClient extends SolrClient {
       this.serverBaseUrl = serverBaseUrl;
     }
 
-    scheduler = new SolrHttpClientScheduler("JettyHttpClientScheduler", true, null, new ThreadGroup("JettyHttpClientScheduler"), 5);
-
     this.headers = builder.headers;
 
     if (builder.idleTimeout != null && builder.idleTimeout > 0) idleTimeout = builder.idleTimeout;
@@ -205,12 +202,6 @@ public class Http2SolrClient extends SolrClient {
   private HttpClient createHttpClient(Builder builder) {
     HttpClient httpClient;
 
-//    BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
-//    httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(Integer.getInteger("solr.http2solrclient.corepool.size", 2),
-//            Integer.getInteger("solr.http2solrclient.maxpool.size", 10), Integer.getInteger("solr.http2solrclient.pool.keepalive", 3000),
-//            TimeUnit.MILLISECONDS, queue, new SolrNamedThreadFactory("h2sc"));
-
-
     SslContextFactory.Client sslContextFactory;
     boolean ssl;
     if (builder.sslConfig == null) {
@@ -237,11 +228,11 @@ public class Http2SolrClient extends SolrClient {
       HTTP2Client http2client = new HTTP2Client();
       transport = new HttpClientTransportOverHTTP2(http2client);
       httpClient = new HttpClient(transport, sslContextFactory);
-      httpClient.setMaxConnectionsPerDestination(10);
+      httpClient.setMaxConnectionsPerDestination(300);
     }
     httpClientExecutor = new SolrQueuedThreadPool("httpClient");
-    httpClientExecutor.setMaxThreads(Math.max(4 , Runtime.getRuntime().availableProcessors()));
-    httpClientExecutor.setMinThreads(3);
+    //httpClientExecutor.setMaxThreads(-1);
+   // httpClientExecutor.setMinThreads(3);
     httpClient.setIdleTimeout(idleTimeout);
     try {
       httpClient.setExecutor(httpClientExecutor);
@@ -252,7 +243,6 @@ public class Http2SolrClient extends SolrClient {
       httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
       httpClient.setIdleTimeout(idleTimeout);
       if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
-      httpClient.setScheduler(scheduler);
       httpClient.start();
     } catch (Exception e) {
       ParWork.propegateInterrupt(e);
@@ -276,17 +266,11 @@ public class Http2SolrClient extends SolrClient {
           }
         });
       }
-      closer.collect(() -> {
-        // we wait for async requests, so far devs don't want to give sugar for this
-        asyncTracker.waitForCompleteFinal();
-        if (httpClientExecutor != null) {
-          try {
-            httpClientExecutor.waitForStopping();
-          } catch (InterruptedException e) {
-            ParWork.propegateInterrupt(e);
-          }
-        }
-      });
+//      closer.collect(() -> {
+//        // we wait for async requests, so far devs don't want to give sugar for this
+//       // asyncTracker.waitForCompleteFinal();
+//
+//      });
       closer.addCollect("httpClientExecutor");
     }
     assert ObjectReleaseTracker.release(this);
@@ -435,8 +419,8 @@ public class Http2SolrClient extends SolrClient {
         ? this.parser: solrRequest.getResponseParser();
     if (onComplete != null) {
       // This async call only suitable for indexing since the response size is limited by 5MB
-      asyncTracker.register();
-      req.send(new BufferingResponseListener(5 * 1024 * 1024) {
+      req.onRequestQueued(asyncTracker.queuedListener)
+          .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
 
         @Override
         public void onComplete(Result result) {
@@ -458,7 +442,7 @@ public class Http2SolrClient extends SolrClient {
               onComplete.onFailure(e);
             }
           } finally {
-            asyncTracker.completeListener.onComplete(result);
+        //    asyncTracker.completeListener.onComplete(result);
           }
         }
       });
@@ -475,8 +459,7 @@ public class Http2SolrClient extends SolrClient {
             }
           }
         };
-        asyncTracker.register();
-        req.send(listener);
+        req.onRequestQueued(asyncTracker.queuedListener).send(listener);
         Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
         InputStream is = listener.getInputStream();
         assert ObjectReleaseTracker.track(is);
@@ -886,7 +869,7 @@ public class Http2SolrClient extends SolrClient {
   private class AsyncTracker {
 
     // nocommit - look at outstanding max again
-    private static final int MAX_OUTSTANDING_REQUESTS = 100;
+    private static final int MAX_OUTSTANDING_REQUESTS = 10;
 
     private final Semaphore available;
 
@@ -898,15 +881,21 @@ public class Http2SolrClient extends SolrClient {
       }
     };
     // maximum outstanding requests left
-   // private final Request.QueuedListener queuedListener;
+    private final Request.QueuedListener queuedListener;
     private final Response.CompleteListener completeListener;
 
     AsyncTracker() {
-//      queuedListener = request -> {
-//        phaser.register();
-//        if (log.isDebugEnabled()) log.debug("Request queued registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-//      };
-      available = new Semaphore(MAX_OUTSTANDING_REQUESTS, true);
+      available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
+      queuedListener = request -> {
+        phaser.register();
+        try {
+          available.acquire();
+        } catch (InterruptedException ignored) {
+          ParWork.propegateInterrupt(ignored);
+        }
+        if (log.isDebugEnabled()) log.debug("Request queued registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
+      };
+
       completeListener = result -> {
        if (log.isDebugEnabled()) log.debug("Request complete registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
         phaser.arriveAndDeregister();
@@ -921,15 +910,19 @@ public class Http2SolrClient extends SolrClient {
 
     public synchronized void waitForComplete() {
       if (log.isDebugEnabled()) log.debug("Before wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-
+      if (phaser.getUnarrivedParties() == 0) {
+        return;
+      }
       int arrival = phaser.arriveAndAwaitAdvance();
 
+     // phaser.awaitAdvance(phaser.arriveAndDeregister());
+
       if (log.isDebugEnabled()) log.debug("After wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
     }
 
     public void waitForCompleteFinal() {
       if (log.isDebugEnabled()) log.debug("Before wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-      int arrival = phaser.arriveAndAwaitAdvance();
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
 
       if (log.isDebugEnabled()) log.debug("After wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
     }
@@ -938,12 +931,12 @@ public class Http2SolrClient extends SolrClient {
       if (log.isDebugEnabled()) {
         log.debug("Registered new party");
       }
-      phaser.register();
-      try {
-        available.acquire();
-      } catch (InterruptedException ignored) {
-        ParWork.propegateInterrupt(ignored);
-      }
+   //   phaser.register();
+//      try {
+//        available.acquire();
+//      } catch (InterruptedException ignored) {
+//        ParWork.propegateInterrupt(ignored);
+//      }
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index 163e755..0e09d43 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -428,7 +428,7 @@ public abstract class LBSolrClient extends SolrClient {
   protected abstract SolrClient getClient(String baseUrl);
 
   private Exception addZombie(String serverStr, Exception e) {
-    log.info("add Zombie {}" + serverStr);
+    log.warn("adding zombie server {} due to exception", serverStr, e);
     ServerWrapper wrapper = createServerWrapper(serverStr);
     wrapper.standard = false;
     zombieServers.put(serverStr, wrapper);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index a8b285f..8a29c28 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -289,9 +289,6 @@ public class ParWorkExecService implements ExecutorService {
 
   @Override
   public void execute(Runnable runnable) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
     boolean success = checkLoad();
     if (success) {
       success = available.tryAcquire();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index d9f30a2..aa6c493 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -142,7 +142,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         setMinThreads(minThreads);
         setMaxThreads(maxThreads);
         setIdleTimeout(idleTimeout);
-        setStopTimeout(5000);
+//       / setStopTimeout(5000);
         setReservedThreads(reservedThreads);
         if (queue == null)
         {
@@ -284,14 +284,14 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
 
     private void joinThreads(long stopByNanos) throws InterruptedException
     {
-        for (Thread thread : _threads)
-        {
-            long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
-            if (LOG.isDebugEnabled())
-                LOG.debug("Waiting for {} for {}", thread, canWait);
-            if (canWait > 0)
-                thread.join(canWait);
-        }
+//        for (Thread thread : _threads)
+//        {
+//            long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
+//            if (LOG.isDebugEnabled())
+//                LOG.debug("Waiting for {} for {}", thread, canWait);
+//            if (canWait > 0)
+//                thread.join(canWait);
+//        }
     }
 
     /**
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index c90e7f2..af5db31 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -448,7 +448,7 @@ public class SolrTestCase extends LuceneTestCase {
 
       if (clazz != null) {
         // nocommit - leave this on
-        fail("A " + clazz.getName() + " took too long to close: " + tooLongTime + "\n" + times);
+        if (!TEST_NIGHTLY) fail("A " + clazz.getName() + " took too long to close: " + tooLongTime + "\n" + times);
       }
     }
     log.info("@AfterClass end ------------------------------------------------------");