You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/12/16 22:58:15 UTC

[lucene-solr] branch master updated: SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new db11e9e  SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase
db11e9e is described below

commit db11e9e9a2c07136399ba002f2bbefe8c611b0a0
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Mon Dec 16 15:58:06 2019 -0700

    SOLR-14081: re-implement FullSolrCloudDistribCmdsTest to extend SolrCloudTestCase
---
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   | 946 ++++++++-------------
 1 file changed, 340 insertions(+), 606 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index d6cbf9b..4bd4b52 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -16,497 +16,274 @@
  */
 package org.apache.solr.cloud;
 
-import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.lucene.util.TestUtil;
 import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.SocketProxy;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Super basic testing, no shard restarting or anything.
  */
 @Slow
-@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase {
-  
+public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
+
   @BeforeClass
-  public static void beforeSuperClass() {
-    schemaString = "schema15.xml";      // we need a string id
+  public static void setupCluster() throws Exception {
+    // use a 5 node cluster so with a typical 2x2 collection one node isn't involved
+    // helps to randomly test edge cases of hitting a node not involved in collection
+    configureCluster(5).configure();
   }
-  
-  public FullSolrCloudDistribCmdsTest() {
-    super();
-    sliceCount = 3;
+
+  @After
+  public void purgeAllCollections() throws Exception {
+    cluster.deleteAllCollections();
+    cluster.getSolrClient().setDefaultCollection(null);
   }
 
+  /**
+   * Creates a new 2x2 collection using a unique name, blocking until it's state is fully active, 
+   * and sets that collection as the default on the cluster's default CloudSolrClient.
+   * 
+   * @return the name of the new collection
+   */
+  public static String createAndSetNewDefaultCollection() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
+    assertEquals(RequestStatusState.COMPLETED,
+                 CollectionAdminRequest.createCollection(name, "_default", 2, 2)
+                 .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+    cloudClient.waitForState(name, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+                             (n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
+    cloudClient.setDefaultCollection(name);
+    return name;
+  }
+  
   @Test
-  @ShardsFixed(num = 6)
-  // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void test() throws Exception {
-    handle.clear();
-    handle.put("timestamp", SKIPVAL);
-    
-    waitForRecoveriesToFinish(false);
+  public void testBasicUpdates() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
     
     // add a doc, update it, and delete it
+    addUpdateDelete("doc1");
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
     
-    QueryResponse results;
-    UpdateRequest uReq;
-    long docId = addUpdateDelete();
-    
-    // add 2 docs in a request
-    SolrInputDocument doc1;
-    SolrInputDocument doc2;
-    docId = addTwoDocsInOneRequest(docId);
-    
-    // two deletes
-    uReq = new UpdateRequest();
-    uReq.deleteById(Long.toString(docId-1));
-    uReq.deleteById(Long.toString(docId-2)).process(cloudClient);
-    controlClient.deleteById(Long.toString(docId-1));
-    controlClient.deleteById(Long.toString(docId-2));
-    
-    commit();
-    
-    results = query(cloudClient);
-    assertEquals(0, results.getResults().getNumFound());
-    
-    results = query(controlClient);
-    assertEquals(0, results.getResults().getNumFound());
-    
-    // add two docs together, a 3rd doc and a delete
-    indexr("id", docId++, t1, "originalcontent");
-    
-    uReq = new UpdateRequest();
-    doc1 = new SolrInputDocument();
-
-    addFields(doc1, "id", docId++);
-    uReq.add(doc1);
-    doc2 = new SolrInputDocument();
-    addFields(doc2, "id", docId++);
-    uReq.add(doc2);
- 
-    uReq.process(cloudClient);
-    uReq.process(controlClient);
-    
-    uReq = new UpdateRequest();
-    uReq.deleteById(Long.toString(docId - 2)).process(cloudClient);
-    controlClient.deleteById(Long.toString(docId - 2));
-    
-    commit();
-    
-    assertDocCounts(VERBOSE);
-    
-    checkShardConsistency();
-    
-    results = query(controlClient);
-    assertEquals(2, results.getResults().getNumFound());
-    
-    results = query(cloudClient);
-    assertEquals(2, results.getResults().getNumFound());
-    
-    docId = testIndexQueryDeleteHierarchical(docId);
-    
-    docId = testIndexingDocPerRequestWithHttpSolrClient(docId);
-    
-    testConcurrentIndexing(docId);
+    // add 2 docs in a single request
+    addTwoDocsInOneRequest("doc2", "doc3");
+    assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    // 2 deletes in a single request...
+    assertEquals(0, (new UpdateRequest().deleteById("doc2").deleteById("doc3"))
+                 .process(cloudClient).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
     
-    // TODO: testOptimisticUpdate(results);
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
     
-    testDeleteByQueryDistrib();
-
-    // See SOLR-7384
-//    testDeleteByIdImplicitRouter();
-//
-//    testDeleteByIdCompositeRouterWithRouterField();
+    // add a doc that we will then delete later after adding two other docs (all before next commit).
+    assertEquals(0, cloudClient.add(sdoc("id", "doc4", "content_s", "will_delete_later")).getStatus());
+    assertEquals(0, cloudClient.add(sdocs(sdoc("id", "doc5"),
+                                          sdoc("id", "doc6"))).getStatus());
+    assertEquals(0, cloudClient.deleteById("doc4").getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
 
-    docId = testThatCantForwardToLeaderFails(docId);
-
-
-    docId = testIndexingBatchPerRequestWithHttpSolrClient(docId);
-  }
+    assertEquals(0, cloudClient.query(params("q", "id:doc4")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "id:doc5")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "id:doc6")).getResults().getNumFound());
+    assertEquals(2, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","post_doc_5_6"));
 
-  private void testDeleteByIdImplicitRouter() throws Exception {
-    SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
-    CollectionAdminResponse response;
-    Map<String, NamedList<Integer>> coresStatus;
-
-    CollectionAdminRequest.Create createCollectionRequest
-      = CollectionAdminRequest.createCollectionWithImplicitRouter("implicit_collection_without_routerfield",
-                                                                  "conf1","shard1,shard2",2);
-    response = createCollectionRequest.process(server);
-
-    assertEquals(0, response.getStatus());
-    assertTrue(response.isSuccess());
-    coresStatus = response.getCollectionCoresStatus();
-    assertEquals(4, coresStatus.size());
-    for (int i = 0; i < 4; i++) {
-      NamedList<Integer> status = coresStatus.get("implicit_collection_without_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
-      assertEquals(0, (int) status.get("status"));
-      assertTrue(status.get("QTime") > 0);
-    }
+    // delete everything....
+    assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
 
-    waitForRecoveriesToFinish("implicit_collection_without_routerfield", true);
-
-    SolrClient shard1 = createNewSolrClient("implicit_collection_without_routerfield_shard1_replica1",
-        getBaseUrl((HttpSolrClient) clients.get(0)));
-    SolrClient shard2 = createNewSolrClient("implicit_collection_without_routerfield_shard2_replica1",
-        getBaseUrl((HttpSolrClient) clients.get(0)));
-
-    SolrInputDocument doc = new SolrInputDocument();
-    int docCounts1, docCounts2;
-
-    // Add three documents to shard1
-    doc.clear();
-    doc.addField("id", "1");
-    doc.addField("title", "s1 one");
-    shard1.add(doc);
-    shard1.commit();
-
-    doc.clear();
-    doc.addField("id", "2");
-    doc.addField("title", "s1 two");
-    shard1.add(doc);
-    shard1.commit();
-
-    doc.clear();
-    doc.addField("id", "3");
-    doc.addField("title", "s1 three");
-    shard1.add(doc);
-    shard1.commit();
-
-    docCounts1 = 3; // Three documents in shard1
-
-    // Add two documents to shard2
-    doc.clear();
-    doc.addField("id", "4");
-    doc.addField("title", "s2 four");
-    shard2.add(doc);
-    shard2.commit();
-
-    doc.clear();
-    doc.addField("id", "5");
-    doc.addField("title", "s2 five");
-    shard2.add(doc);
-    shard2.commit();
-
-    docCounts2 = 2; // Two documents in shard2
-
-    // Verify the documents were added to correct shards
-    ModifiableSolrParams query = new ModifiableSolrParams();
-    query.set("q", "*:*");
-    QueryResponse respAll = shard1.query(query);
-    assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
-
-    query.set("shards", "shard1");
-    QueryResponse resp1 = shard1.query(query);
-    assertEquals(docCounts1, resp1.getResults().getNumFound());
-
-    query.set("shards", "shard2");
-    QueryResponse resp2 = shard2.query(query);
-    assertEquals(docCounts2, resp2.getResults().getNumFound());
-
-
-    // Delete a document in shard2 with update to shard1, with _route_ param
-    // Should delete.
-    UpdateRequest deleteRequest = new UpdateRequest();
-    deleteRequest.deleteById("4", "shard2");
-    shard1.request(deleteRequest);
-    shard1.commit();
-    query.set("shards", "shard2");
-    resp2 = shard2.query(query);
-    assertEquals(--docCounts2, resp2.getResults().getNumFound());
-
-    // Delete a document in shard2 with update to shard1, without _route_ param
-    // Shouldn't delete, since deleteById requests are not broadcast to all shard leaders.
-    deleteRequest = new UpdateRequest();
-    deleteRequest.deleteById("5");
-    shard1.request(deleteRequest);
-    shard1.commit();
-    query.set("shards", "shard2");
-    resp2 = shard2.query(query);
-    assertEquals(docCounts2, resp2.getResults().getNumFound());
-
-    // Multiple deleteById commands in a single request
-    deleteRequest.clear();
-    deleteRequest.deleteById("2", "shard1");
-    deleteRequest.deleteById("3", "shard1");
-    deleteRequest.setCommitWithin(1);
-    query.set("shards", "shard1");
-    shard2.request(deleteRequest);
-    resp1 = shard1.query(query);
-    --docCounts1;
-    --docCounts1;
-    assertEquals(docCounts1, resp1.getResults().getNumFound());
-
-    // Test commitWithin, update to shard2, document deleted in shard1
-    deleteRequest.clear();
-    deleteRequest.deleteById("1", "shard1");
-    deleteRequest.setCommitWithin(1);
-    shard2.request(deleteRequest);
-    Thread.sleep(1000);
-    query.set("shards", "shard1");
-    resp1 = shard1.query(query);
-    assertEquals(--docCounts1, resp1.getResults().getNumFound());
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
+    
   }
 
-  private void testDeleteByIdCompositeRouterWithRouterField() throws Exception {
-    SolrClient server = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)));
-    CollectionAdminResponse response;
-    Map<String, NamedList<Integer>> coresStatus;
-
-    response = CollectionAdminRequest.createCollection("compositeid_collection_with_routerfield","conf1",2,2)
-            .setRouterName("compositeId")
-            .setRouterField("routefield_s")
-            .setShards("shard1,shard2")
-            .process(server);
-
-    assertEquals(0, response.getStatus());
-    assertTrue(response.isSuccess());
-    coresStatus = response.getCollectionCoresStatus();
-    assertEquals(4, coresStatus.size());
-    for (int i = 0; i < 4; i++) {
-      NamedList<Integer> status = coresStatus.get("compositeid_collection_with_routerfield_shard" + (i / 2 + 1) + "_replica" + (i % 2 + 1));
-      assertEquals(0, (int) status.get("status"));
-      assertTrue(status.get("QTime") > 0);
-    }
-
-    waitForRecoveriesToFinish("compositeid_collection_with_routerfield", true);
-
-    SolrClient shard1 = createNewSolrClient("compositeid_collection_with_routerfield_shard1_replica1",
-        getBaseUrl((HttpSolrClient) clients.get(0)));
-    SolrClient shard2 = createNewSolrClient("compositeid_collection_with_routerfield_shard2_replica1",
-        getBaseUrl((HttpSolrClient) clients.get(0)));
-
-    SolrInputDocument doc = new SolrInputDocument();
-    int docCounts1 = 0, docCounts2 = 0;
-
-    // Add three documents to shard1
-    doc.clear();
-    doc.addField("id", "1");
-    doc.addField("title", "s1 one");
-    doc.addField("routefield_s", "europe");
-    shard1.add(doc);
-    shard1.commit();
-
-    doc.clear();
-    doc.addField("id", "2");
-    doc.addField("title", "s1 two");
-    doc.addField("routefield_s", "europe");
-    shard1.add(doc);
-    shard1.commit();
-
-    doc.clear();
-    doc.addField("id", "3");
-    doc.addField("title", "s1 three");
-    doc.addField("routefield_s", "europe");
-    shard1.add(doc);
-    shard1.commit();
-
-    docCounts1 = 3; // Three documents in shard1
-
-    // Add two documents to shard2
-    doc.clear();
-    doc.addField("id", "4");
-    doc.addField("title", "s2 four");
-    doc.addField("routefield_s", "africa");
-    shard2.add(doc);
-    //shard2.commit();
-
-    doc.clear();
-    doc.addField("id", "5");
-    doc.addField("title", "s2 five");
-    doc.addField("routefield_s", "africa");
-    shard2.add(doc);
-    shard2.commit();
-
-    docCounts2 = 2; // Two documents in shard2
-
-    // Verify the documents were added to correct shards
-    ModifiableSolrParams query = new ModifiableSolrParams();
-    query.set("q", "*:*");
-    QueryResponse respAll = shard1.query(query);
-    assertEquals(docCounts1 + docCounts2, respAll.getResults().getNumFound());
-
-    query.set("shards", "shard1");
-    QueryResponse resp1 = shard1.query(query);
-    assertEquals(docCounts1, resp1.getResults().getNumFound());
-
-    query.set("shards", "shard2");
-    QueryResponse resp2 = shard2.query(query);
-    assertEquals(docCounts2, resp2.getResults().getNumFound());
-
-    // Delete a document in shard2 with update to shard1, with _route_ param
-    // Should delete.
-    UpdateRequest deleteRequest = new UpdateRequest();
-    deleteRequest.deleteById("4", "africa");
-    deleteRequest.setCommitWithin(1);
-    shard1.request(deleteRequest);
-    shard1.commit();
-
-    query.set("shards", "shard2");
-    resp2 = shard2.query(query);
-    --docCounts2;
-    assertEquals(docCounts2, resp2.getResults().getNumFound());
-
-    // Multiple deleteById commands in a single request
-    deleteRequest.clear();
-    deleteRequest.deleteById("2", "europe");
-    deleteRequest.deleteById("3", "europe");
-    deleteRequest.setCommitWithin(1);
-    query.set("shards", "shard1");
-    shard1.request(deleteRequest);
-    shard1.commit();
-    Thread.sleep(1000);
-    resp1 = shard1.query(query);
-    --docCounts1;
-    --docCounts1;
-    assertEquals(docCounts1, resp1.getResults().getNumFound());
-
-    // Test commitWithin, update to shard2, document deleted in shard1
-    deleteRequest.clear();
-    deleteRequest.deleteById("1", "europe");
-    deleteRequest.setCommitWithin(1);
-    shard2.request(deleteRequest);
-    query.set("shards", "shard1");
-    resp1 = shard1.query(query);
-    --docCounts1;
-    assertEquals(docCounts1, resp1.getResults().getNumFound());
-  }
 
-  private long testThatCantForwardToLeaderFails(long docId) throws Exception {
-    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
-    ZkNodeProps props = zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard1");
-    
-    chaosMonkey.stopShard("shard1");
-    
-    Thread.sleep(1000);
+  public void testThatCantForwardToLeaderFails() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = "test_collection_" + NAME_COUNTER.getAndIncrement();
+    cloudClient.setDefaultCollection(collectionName);
     
-    // fake that the leader is still advertised
-    String leaderPath = ZkStateReader.getShardLeadersPath(DEFAULT_COLLECTION, "shard1");
-    SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), 10000);
-    int fails = 0;
+    // get a random node for use in our collection before creating the one we'll partition..
+    final JettySolrRunner otherLeader = cluster.getRandomJetty(random());
+    // pick a (second) random node (which may be the same) for sending updates to
+    // (if it's the same, we're testing routing from another shard, if diff we're testing routing
+    // from a non-collection node)
+    final String indexingUrl = cluster.getRandomJetty(random()).getProxyBaseUrl() + "/" + collectionName;
+
+    // create a new node for the purpose of killing it...
+    final JettySolrRunner leaderToPartition = cluster.startJettySolrRunner();
     try {
-      zkClient.makePath(leaderPath, Utils.toJSON(props),
-          CreateMode.EPHEMERAL, true);
-      for (int i = 0; i < 200; i++) {
-        try {
-          index_specific(shardToJetty.get("shard2").get(0).client.solrClient, id, docId++);
-        } catch (SolrException e) {
-          // expected
-          fails++;
-          break;
-        } catch (SolrServerException e) {
-          // expected
-          fails++;
-          break;
+      cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+
+      // HACK: we have to stop the node in order to enable the proxy, in order to then restart the node
+      // (in order to then "partition it" later via the proxy)
+      final SocketProxy proxy = new SocketProxy();
+      cluster.stopJettySolrRunner(leaderToPartition);
+      cluster.waitForJettyToStop(leaderToPartition);
+      leaderToPartition.setProxyPort(proxy.getListenPort());
+      cluster.startJettySolrRunner(leaderToPartition);
+      proxy.open(leaderToPartition.getBaseUrl().toURI());
+      try {
+        log.info("leaderToPartition's Proxy: {}", proxy);
+        
+        cluster.waitForNode(leaderToPartition, DEFAULT_TIMEOUT);
+        // create a 2x1 collection using a nodeSet that includes our leaderToPartition...
+        assertEquals(RequestStatusState.COMPLETED,
+                     CollectionAdminRequest.createCollection(collectionName, 2, 1)
+                     .setCreateNodeSet(leaderToPartition.getNodeName() + "," + otherLeader.getNodeName())
+                     .processAndWait(cloudClient, DEFAULT_TIMEOUT));
+        
+        cloudClient.waitForState(collectionName, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+                                 (n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
+        
+        { // HACK: Check the leaderProps for the shard hosted on the node we're going to kill...
+          final Replica leaderProps = cloudClient.getZkStateReader()
+            .getClusterState().getCollection(collectionName)
+            .getLeaderReplicas(leaderToPartition.getNodeName()).get(0);
+          
+          // No point in this test if these aren't true...
+          assertNotNull("Sanity check: leaderProps isn't a leader?: " + leaderProps.toString(),
+                        leaderProps.getStr(Slice.LEADER));
+          assertTrue("Sanity check: leaderProps isn't using the proxy port?: " + leaderProps.toString(),
+                     leaderProps.getCoreUrl().contains(""+proxy.getListenPort()));
         }
+        
+        // create client to send our updates to...
+        try (HttpSolrClient indexClient = getHttpSolrClient(indexingUrl)) {
+          
+          // Sanity check: we should be able to send a bunch of updates that work right now...
+          for (int i = 0; i < 100; i++) {
+            final UpdateResponse rsp = indexClient.add
+              (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+            assertEquals(0, rsp.getStatus());
+          }
+
+          log.info("Closing leaderToPartition's proxy: {}", proxy);
+          proxy.close(); // NOTE: can't use halfClose, won't ensure a garunteed failure
+          
+          final SolrException e = expectThrows(SolrException.class, () -> {
+              // start at 50 so that we have some "updates" to previous docs and some "adds"...
+              for (int i = 50; i < 250; i++) {
+                // Pure random odds of all of these docs belonging to the live shard are 1 in 2**200...
+                // Except we know the hashing algorithm isn't purely random,
+                // So the actual odds are "0" unless the hashing algorithm is changed to suck badly...
+                final UpdateResponse rsp = indexClient.add
+                (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+                // if the update didn't throw an exception, it better be a success..
+                assertEquals(0, rsp.getStatus());
+              }
+            });
+          assertEquals(500, e.code());
+        }
+      } finally {
+        proxy.close(); // don't leak this port
       }
     } finally {
-      zkClient.close();
+      cluster.stopJettySolrRunner(leaderToPartition); // don't let this jetty bleed into other tests
+      cluster.waitForJettyToStop(leaderToPartition);
     }
-
-    assertTrue("A whole shard is down - some of these should fail", fails > 0);
-    return docId;
   }
+  
+  /**  NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+  private void addTwoDocsInOneRequest(String docIdA, String docIdB) throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
 
-  private long addTwoDocsInOneRequest(long docId) throws
-      Exception {
-    QueryResponse results;
-    UpdateRequest uReq;
-    uReq = new UpdateRequest();
-    docId = addDoc(docId, uReq);
-    docId = addDoc(docId, uReq);
-    
-    uReq.process(cloudClient);
-    uReq.process(controlClient);
-    
-    commit();
-    
-    checkShardConsistency();
+    assertEquals(0, cloudClient.add(sdocs(sdoc("id", docIdA),
+                                          sdoc("id", docIdB))).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
     
-    assertDocCounts(VERBOSE);
+    assertEquals(2, cloudClient.query(params("q","id:(" + docIdA + " OR " + docIdB + ")")
+                                      ).getResults().getNumFound());
     
-    results = query(cloudClient);
-    assertEquals(2, results.getResults().getNumFound());
-    return docId;
+    checkShardConsistency(params("q","*:*", "rows", "99","_trace","two_docs"));
   }
 
-  private long addUpdateDelete() throws Exception,
-      IOException {
-    long docId = 99999999L;
-    indexr("id", docId, t1, "originalcontent");
+  /**  NOTE: uses the cluster's CloudSolrClient and asumes default collection has been set */
+  private void addUpdateDelete(String docId) throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+
+    // add the doc, confirm we can query it...
+    assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "originalcontent")).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
     
-    commit();
+    assertEquals(1, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+    assertEquals(1,
+                 cloudClient.query(params("q", "content_t:originalcontent AND id:" + docId))
+                 .getResults().getNumFound());
     
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.add("q", t1 + ":originalcontent");
-    QueryResponse results = clients.get(0).query(params);
-    assertEquals(1, results.getResults().getNumFound());
+    checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","original_doc"));
     
     // update doc
-    indexr("id", docId, t1, "updatedcontent");
-    
-    commit();
-    
-    assertDocCounts(VERBOSE);
+    assertEquals(0, cloudClient.add(sdoc("id", docId, "content_t", "updatedcontent")).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
     
-    results = clients.get(0).query(params);
-    assertEquals(0, results.getResults().getNumFound());
+    // confirm we can query the doc by updated content and not original...
+    assertEquals(0, cloudClient.query(params("q", "content_t:originalcontent")).getResults().getNumFound());
+    assertEquals(1, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
+    assertEquals(1,
+                 cloudClient.query(params("q", "content_t:updatedcontent AND id:" + docId))
+                 .getResults().getNumFound());
     
-    params.set("q", t1 + ":updatedcontent");
+    // delete the doc, confim it no longer matches in queries...
+    assertEquals(0, cloudClient.deleteById(docId).getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
     
-    results = clients.get(0).query(params);
-    assertEquals(1, results.getResults().getNumFound());
+    assertEquals(0, cloudClient.query(params("q", "id:" + docId)).getResults().getNumFound());
+    assertEquals(0, cloudClient.query(params("q", "content_t:updatedcontent")).getResults().getNumFound());
     
-    UpdateRequest uReq = new UpdateRequest();
-    //uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
-    uReq.deleteById(Long.toString(docId)).process(clients.get(0));
-    
-    commit();
-    
-    results = clients.get(0).query(params);
-    assertEquals(0, results.getResults().getNumFound());
-    return docId;
-  }
+    checkShardConsistency(params("q","id:" + docId, "rows", "99","_trace","del_updated_doc"));
 
-  private void testDeleteByQueryDistrib() throws Exception {
-    del("*:*");
-    commit();
-    assertEquals(0, query(cloudClient).getResults().getNumFound());
   }
 
-  private long testIndexQueryDeleteHierarchical(long docId) throws Exception {
-    //index
-    int topDocsNum = atLeast(10);
+
+  public long testIndexQueryDeleteHierarchical() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+    
+    // index
+    long docId = 42;
+    int topDocsNum = atLeast(5);
     int childsNum = 5+random().nextInt(5);
     for (int i = 0; i < topDocsNum; ++i) {
       UpdateRequest uReq = new UpdateRequest();
@@ -522,40 +299,46 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase
       }
       
       uReq.add(topDocument);
-      uReq.process(cloudClient);
-      uReq.process(controlClient);
+      assertEquals(i + "/" + docId,
+                   0, uReq.process(cloudClient).getStatus());
     }
+    assertEquals(0, cloudClient.commit().getStatus());
+
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","added_all_top_docs_with_kids"));
     
-    commit();
-    checkShardConsistency();
-    assertDocCounts(VERBOSE);
+    // query
     
-    //query
     // parents
-    SolrQuery query = new SolrQuery("type_s:parent");
-    QueryResponse results = cloudClient.query(query);
-    assertEquals(topDocsNum, results.getResults().getNumFound());
+    assertEquals(topDocsNum,
+                 cloudClient.query(new SolrQuery("type_s:parent")).getResults().getNumFound());
     
-    //childs 
-    query = new SolrQuery("type_s:child");
-    results = cloudClient.query(query);
-    assertEquals(topDocsNum * childsNum, results.getResults().getNumFound());
+    // childs 
+    assertEquals(topDocsNum * childsNum,
+                 cloudClient.query(new SolrQuery("type_s:child")).getResults().getNumFound());
+                 
     
-    //grandchilds
-    query = new SolrQuery("type_s:grand");
-    results = cloudClient.query(query);
+    // grandchilds
+    //
     //each topDoc has t childs where each child has x = 0 + 2 + 4 + ..(t-1)*2 grands
     //x = 2 * (1 + 2 + 3 +.. (t-1)) => arithmetic summ of t-1 
     //x = 2 * ((t-1) * t / 2) = t * (t - 1)
-    assertEquals(topDocsNum * childsNum * (childsNum - 1), results.getResults().getNumFound());
+    assertEquals(topDocsNum * childsNum * (childsNum - 1),
+                 cloudClient.query(new SolrQuery("type_s:grand")).getResults().getNumFound());
     
     //delete
-    del("*:*");
-    commit();
+    assertEquals(0, cloudClient.deleteByQuery("*:*").getStatus());
+    assertEquals(0, cloudClient.commit().getStatus());
+    assertEquals(0, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    checkShardConsistency(params("q","*:*", "rows", "9999","_trace","delAll"));
     
     return docId;
   }
+
   
+  /**
+   * Recursive helper function for building out child and grandchild docs
+   */
   private long addChildren(String prefix, SolrInputDocument topDocument, int childIndex, boolean lastLevel, long docId) {
     SolrInputDocument childDocument = new SolrInputDocument();
     childDocument.addField("id", docId++);
@@ -574,188 +357,139 @@ public class FullSolrCloudDistribCmdsTest extends AbstractFullDistribZkTestBase
   }
   
   
-  private long testIndexingDocPerRequestWithHttpSolrClient(long docId) throws Exception {
-    int docs = random().nextInt(TEST_NIGHTLY ? 4013 : 97) + 1;
-    for (int i = 0; i < docs; i++) {
+  public void testIndexingOneDocPerRequestWithHttpSolrClient() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+    
+    final int numDocs = atLeast(50);
+    for (int i = 0; i < numDocs; i++) {
       UpdateRequest uReq;
       uReq = new UpdateRequest();
-      docId = addDoc(docId, uReq);
-      
-      uReq.process(cloudClient);
-      uReq.process(controlClient);
-      
+      assertEquals(0, cloudClient.add
+                   (sdoc("id", i, "text_t", TestUtil.randomRealisticUnicodeString(random(), 200))).getStatus());
     }
-    commit();
-    
-    checkShardConsistency();
-    assertDocCounts(VERBOSE);
+    assertEquals(0, cloudClient.commit().getStatus());
+    assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
     
-    return docId++;
+    checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
   }
   
-  private long testIndexingBatchPerRequestWithHttpSolrClient(long docId) throws Exception {
-    
-    // remove collection
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set("action", CollectionAction.DELETE.toString());
-    params.set("name", "collection1");
-    QueryRequest request = new QueryRequest(params);
-    request.setPath("/admin/collections");
-    
-  
-    cloudClient.request(request);
-    
-    controlClient.deleteByQuery("*:*");
-    controlClient.commit();
-    
-    // somtimes we use an oversharded collection
-    createCollection(null, "collection2", 7, 3, 100000, cloudClient, null, "conf1");
-    cloudClient.setDefaultCollection("collection2");
-    waitForRecoveriesToFinish("collection2", false);
-    
-    class IndexThread extends Thread {
-      Integer name;
+  public void testIndexingBatchPerRequestWithHttpSolrClient() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+
+    final int numDocsPerBatch = atLeast(5);
+    final int numBatchesPerThread = atLeast(5);
+      
+    final CountDownLatch abort = new CountDownLatch(1);
+    class BatchIndexer implements Runnable {
+      private boolean keepGoing() {
+        return 0 < abort.getCount();
+      }
       
-      public IndexThread(Integer name) {
+      final int name;
+      public BatchIndexer(int name) {
         this.name = name;
       }
       
       @Override
       public void run() {
-        int rnds = random().nextInt(TEST_NIGHTLY ? 10 : 3) + 1;
-        for (int i = 0; i < rnds; i++) {
-          UpdateRequest uReq;
-          uReq = new UpdateRequest();
-          int cnt = random().nextInt(TEST_NIGHTLY ? 2000 : 200) + 1;
-          for (int j = 0; j <cnt; j++) {
-            addDoc("thread" + name + "_" + i + "_" + j, uReq);
-          }
-          
-          try {
-            uReq.process(cloudClient);
-            uReq.process(controlClient);
-          } catch (SolrServerException | IOException e) {
-            throw new RuntimeException(e);
+        try {
+          for (int batchId = 0; batchId < numBatchesPerThread && keepGoing(); batchId++) {
+            final UpdateRequest req = new UpdateRequest();
+            for (int docId = 0; docId < numDocsPerBatch && keepGoing(); docId++) {
+              req.add(sdoc("id", "indexer" + name + "_" + batchId + "_" + docId,
+                           "test_t", TestUtil.randomRealisticUnicodeString(random(), 200)));
+            }
+            assertEquals(0, req.process(cloudClient).getStatus());
           }
-
-
+        } catch (Throwable e) {
+          abort.countDown();
+          throw new RuntimeException(e);
         }
       }
     };
-    List<Thread> threads = new ArrayList<>();
-
-    int nthreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
-    for (int i = 0; i < nthreads; i++) {
-      IndexThread thread = new IndexThread(i);
-      threads.add(thread);
-      thread.start();
+    final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("batchIndexing");
+    final int numThreads = random().nextInt(TEST_NIGHTLY ? 4 : 2) + 1;
+    final List<Future<?>> futures = new ArrayList<>(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(executor.submit(new BatchIndexer(i)));
     }
-    
-    for (Thread thread : threads) {
-      thread.join();
+    final int totalDocsExpected = numThreads * numBatchesPerThread * numDocsPerBatch;
+    ExecutorUtil.shutdownAndAwaitTermination(executor);
+
+    for (Future result : futures) {
+      assertFalse(result.isCancelled());
+      assertTrue(result.isDone());
+      // all we care about is propogating any possibile execution exception...
+      final Object ignored = result.get();
     }
     
-    commit();
-    
-    waitForRecoveriesToFinish("collection2", false);
-    
-    printLayout();
-    
-    SolrQuery query = new SolrQuery("*:*");
-    long controlCount = controlClient.query(query).getResults()
-        .getNumFound();
-    long cloudCount = cloudClient.query(query).getResults().getNumFound();
-
-    
-    CloudInspectUtil.compareResults(controlClient, cloudClient);
-    
-    assertEquals("Control does not match cloud", controlCount, cloudCount);
-    System.out.println("DOCS:" + controlCount);
-
-    return docId;
+    cloudClient.commit();
+    assertEquals(totalDocsExpected, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+    checkShardConsistency(params("q","*:*", "rows", ""+totalDocsExpected, "_trace","batches_done"));
   }
 
-  private long addDoc(long docId, UpdateRequest uReq) {
-    addDoc(Long.toString(docId++), uReq);
-    return docId;
-  }
-  
-  private long addDoc(String docId, UpdateRequest uReq) {
-    SolrInputDocument doc1 = new SolrInputDocument();
-    
-    uReq.add(doc1);
-    addFields(doc1, "id", docId, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc");
-    return -1;
-  }
-  
-  private long testConcurrentIndexing(long docId) throws Exception {
-    QueryResponse results = query(cloudClient);
-    long beforeCount = results.getResults().getNumFound();
-    int cnt = TEST_NIGHTLY ? 2933 : 313;
-    try (ConcurrentUpdateSolrClient concurrentClient = getConcurrentUpdateSolrClient(
-        ((HttpSolrClient) clients.get(0)).getBaseURL(), 10, 2, 120000)) {
-      for (int i = 0; i < cnt; i++) {
-        index_specific(concurrentClient, id, docId++, "text_t", "some text so that it not's negligent work to parse this doc, even though it's still a pretty short doc");
+  public void testConcurrentIndexing() throws Exception {
+    final CloudSolrClient cloudClient = cluster.getSolrClient();
+    final String collectionName = createAndSetNewDefaultCollection();
+
+    final int numDocs = atLeast(50);
+    final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
+    try (ConcurrentUpdateSolrClient indexClient
+         = getConcurrentUpdateSolrClient(nodeToUpdate.getProxyBaseUrl() + "/" + collectionName, 10, 2)) {
+      
+      for (int i = 0; i < numDocs; i++) {
+        indexClient.add(sdoc("id", i, "text_t",
+                             TestUtil.randomRealisticUnicodeString(random(), 200)));
       }
-      concurrentClient.blockUntilFinished();
+      indexClient.blockUntilFinished();
       
-      commit();
+      assertEquals(0, indexClient.commit().getStatus());
+      assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
 
-      checkShardConsistency();
-      assertDocCounts(VERBOSE);
+      checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
     }
-    results = query(cloudClient);
-    assertEquals(beforeCount + cnt, results.getResults().getNumFound());
-    return docId;
   }
   
-  private void testOptimisticUpdate(QueryResponse results) throws Exception {
-    SolrDocument doc = results.getResults().get(0);
-    Long version = (Long) doc.getFieldValue(VERSION_FIELD);
-    Integer theDoc = (Integer) doc.getFieldValue("id");
-    UpdateRequest uReq = new UpdateRequest();
-    SolrInputDocument doc1 = new SolrInputDocument();
-    uReq.setParams(new ModifiableSolrParams());
-    uReq.getParams().set(VERSION_FIELD, Long.toString(version));
-    addFields(doc1, "id", theDoc, t1, "theupdatestuff");
-    uReq.add(doc1);
-    
-    uReq.process(cloudClient);
-    uReq.process(controlClient);
-    
-    commit();
-    
-    // updating the old version should fail...
-    SolrInputDocument doc2 = new SolrInputDocument();
-    uReq = new UpdateRequest();
-    uReq.setParams(new ModifiableSolrParams());
-    uReq.getParams().set(VERSION_FIELD, Long.toString(version));
-    addFields(doc2, "id", theDoc, t1, "thenewupdatestuff");
-    uReq.add(doc2);
-    
-    uReq.process(cloudClient);
-    uReq.process(controlClient);
-    
-    commit();
-    
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.add("q", t1 + ":thenewupdatestuff");
-    QueryResponse res = clients.get(0).query(params);
-    assertEquals(0, res.getResults().getNumFound());
-    
-    params = new ModifiableSolrParams();
-    params.add("q", t1 + ":theupdatestuff");
-    res = clients.get(0).query(params);
-    assertEquals(1, res.getResults().getNumFound());
-  }
-
-  private QueryResponse query(SolrClient client) throws SolrServerException, IOException {
-    SolrQuery query = new SolrQuery("*:*");
-    return client.query(query);
-  }
-  
-  protected SolrInputDocument addRandFields(SolrInputDocument sdoc) {
-    return sdoc;
+  /**
+   * Inspects the cluster to determine all active shards/replicas for the default collection then,
+   * executes a <code>distrib=false</code> query using the specified params, and compares the resulting 
+   * {@link SolrDocumentList}, failing if any replica does not agree with it's leader.
+   *
+   * @see #cluster
+   * @see CloudInspectUtil#showDiff 
+   */
+  private void checkShardConsistency(final SolrParams params) throws Exception {
+    // TODO: refactor into static in CloudInspectUtil w/ DocCollection param?
+    // TODO: refactor to take in a BiFunction<QueryResponse,QueryResponse,Boolean> ?
+    
+    final SolrParams perReplicaParams = SolrParams.wrapDefaults(params("distrib", "false"),
+                                                                params);
+    final DocCollection collection = cluster.getSolrClient().getZkStateReader()
+      .getClusterState().getCollection(cluster.getSolrClient().getDefaultCollection());
+    log.info("Checking shard consistency via: {}", perReplicaParams);
+    for (Map.Entry<String,Slice> entry : collection.getActiveSlicesMap().entrySet()) {
+      final String shardName = entry.getKey();
+      final Slice slice = entry.getValue();
+      log.info("Checking: {} -> {}", shardName, slice);
+      final Replica leader = entry.getValue().getLeader();
+      try (HttpSolrClient leaderClient = getHttpSolrClient(leader.getCoreUrl())) {
+        final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
+        log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
+        for (Replica replica : slice) {
+          try (HttpSolrClient replicaClient = getHttpSolrClient(replica.getCoreUrl())) {
+            final SolrDocumentList replicaResults = replicaClient.query(perReplicaParams).getResults();
+            log.debug("Shard {}: Replica ({}) results: {}", shardName, replica.getCoreName(), replicaResults);
+            assertEquals("inconsistency w/leader: shard=" + shardName + "core=" + replica.getCoreName(),
+                         Collections.emptySet(),
+                         CloudInspectUtil.showDiff(leaderResults, replicaResults,
+                                                   shardName + " leader: " + leader.getCoreUrl(),
+                                                   shardName + ": " + replica.getCoreUrl()));
+          }
+        }
+      }
+    }
   }
 
 }