You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/31 21:05:58 UTC

[lucene-solr] 02/03: @1294 Bridge ChaosMonkeySafeLeaderTest.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 452728ac850ca60634c16191145e5f1dcdb586f8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Jan 30 12:19:25 2021 -0600

    @1294 Bridge ChaosMonkeySafeLeaderTest.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |   6 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |   6 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  63 +-
 .../java/org/apache/solr/core/CoreContainer.java   |   5 +
 .../src/java/org/apache/solr/core/SolrCore.java    |  36 +-
 .../org/apache/solr/update/SolrCmdDistributor.java |  32 +-
 .../processor/DistributedUpdateProcessor.java      |  18 +-
 .../processor/DistributedZkUpdateProcessor.java    |  22 +-
 .../processor/RoutedAliasUpdateProcessor.java      |   2 +-
 .../solr/cloud/ChaosMonkeySafeLeaderTest.java      | 274 +++++++--
 .../apache/solr/cloud/SolrCloudBridgeTestCase.java |   8 +-
 .../cloud/hdfs/HdfsChaosMonkeySafeLeaderTest.java  |  31 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |  83 +--
 .../org/apache/solr/cloud/ClusterChaosMonkey.java  | 663 +++++++++++++++++++++
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |  16 +
 .../apache/solr/cloud/StoppableIndexingThread.java |  21 +-
 16 files changed, 1116 insertions(+), 170 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3cad8e7..8bedc7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -281,9 +281,9 @@ public class Overseer implements SolrCloseable {
       throw new IllegalStateException("Cannot start an Overseer that is not closed");
     }
 
-    if (OUR_JVM_OVERSEER != null) {
-      throw new IllegalStateException("Cannot start an Overseer if another is running");
-    }
+//    if (OUR_JVM_OVERSEER != null) {
+//      throw new IllegalStateException("Cannot start an Overseer if another is running");
+//    }
 
     OUR_JVM_OVERSEER = this;
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 737d774..773c116 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud;
 
 import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
@@ -104,8 +105,11 @@ public class StatePublisher implements Closeable {
             processMessage(bulkMessage);
           }
 
+        } catch (AlreadyClosedException e) {
+          log.info("StatePublisher run loop hit AlreadyClosedException, exiting ...");
+          return;
         } catch (Exception e) {
-          log.error("Exception in StatePublisher run loop", e);
+          log.error("Exception in StatePublisher run loop, exiting", e);
           return;
         }
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 38ea02c..ce719ea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -171,36 +171,41 @@ public class ZkController implements Closeable, Runnable {
   public void run() {
     disconnect(true);
     if (zkClient.isConnected()) {
-      log.info("Waiting to see DOWN states for node before shutdown ...");
-      Collection<SolrCore> cores = cc.getCores();
-      for (SolrCore core : cores) {
-        CoreDescriptor desc = core.getCoreDescriptor();
-        String collection = desc.getCollectionName();
-        try {
-          zkStateReader.waitForState(collection, 2, TimeUnit.SECONDS, (n, c) -> {
-            if (c == null) {
-              return false;
-            }
-            List<Replica> replicas = c.getReplicas();
-            for (Replica replica : replicas) {
-              if (replica.getNodeName().equals(getNodeName())) {
-                if (!replica.getState().equals(Replica.State.DOWN)) {
-                  // log.info("Found state {} {}", replica.getState(), replica.getNodeName());
-                  return false;
-                }
-              }
-            }
-
-            return true;
-          });
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          return;
-        } catch (TimeoutException e) {
-          log.error("Timeout", e);
-          break;
-        }
+      try {
+        Thread.sleep(300);
+      } catch (InterruptedException e) {
+        ParWork.propagateInterrupt(e);
       }
+      //      log.info("Waiting to see DOWN states for node before shutdown ...");
+//      Collection<SolrCore> cores = cc.getCores();
+//      for (SolrCore core : cores) {
+//        CoreDescriptor desc = core.getCoreDescriptor();
+//        String collection = desc.getCollectionName();
+//        try {
+//          zkStateReader.waitForState(collection, 2, TimeUnit.SECONDS, (n, c) -> {
+//            if (c == null) {
+//              return false;
+//            }
+//            List<Replica> replicas = c.getReplicas();
+//            for (Replica replica : replicas) {
+//              if (replica.getNodeName().equals(getNodeName())) {
+//                if (!replica.getState().equals(Replica.State.DOWN)) {
+//                  // log.info("Found state {} {}", replica.getState(), replica.getNodeName());
+//                  return false;
+//                }
+//              }
+//            }
+//
+//            return true;
+//          });
+//        } catch (InterruptedException e) {
+//          ParWork.propagateInterrupt(e);
+//          return;
+//        } catch (TimeoutException e) {
+//          log.error("Timeout", e);
+//          break;
+//        }
+//      }
     } else {
       log.info("ZkClient is not connected, won't wait to see DOWN nodes on shutdown");
     }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index d3ca4de..d699380 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -2087,6 +2087,11 @@ public class CoreContainer implements Closeable {
    * @see SolrCore#close()
    */
   public SolrCore getCore(String name) {
+
+    if (name == null) {
+      throw new IllegalArgumentException("SolrCore name cannot be null");
+    }
+
     SolrCore core = null;
     CoreDescriptor desc = null;
 
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 4ba0e81..cfa1301 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1653,15 +1653,15 @@ public final class SolrCore implements SolrInfoBean, Closeable {
     }
     int cnt = refCount.incrementAndGet();
 
-    if (log.isDebugEnabled()) {
-      RuntimeException e = new RuntimeException();
-      StackTraceElement[] stack = e.getStackTrace();
-      for (int i = 0; i < Math.min(8, stack.length - 1); i++) {
-        log.debug(stack[i].toString());
-      }
-
-      log.debug("open refcount {} {}", this, cnt);
-    }
+//    if (log.isDebugEnabled()) {
+//      RuntimeException e = new RuntimeException();
+//      StackTraceElement[] stack = e.getStackTrace();
+//      for (int i = 0; i < Math.min(8, stack.length - 1); i++) {
+//        log.debug(stack[i].toString());
+//      }
+//
+//      log.debug("open refcount {} {}", this, cnt);
+//    }
   }
 
   /**
@@ -1700,15 +1700,15 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       closeAndWait.notifyAll();
     }
 
-    if (log.isDebugEnabled()) {
-      RuntimeException e = new RuntimeException();
-      StackTraceElement[] stack = e.getStackTrace();
-      for (int i = 0; i < Math.min(8, stack.length - 1); i++) {
-        log.debug(stack[i].toString());
-      }
-
-      log.debug("close refcount after {} {}", this, count);
-    }
+//    if (log.isDebugEnabled()) {
+//      RuntimeException e = new RuntimeException();
+//      StackTraceElement[] stack = e.getStackTrace();
+//      for (int i = 0; i < Math.min(8, stack.length - 1); i++) {
+//        log.debug(stack[i].toString());
+//      }
+//
+//      log.debug("close refcount after {} {}", this, count);
+//    }
 
     if (count == 0) {
       try {
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5f781ad..b4479b8 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -20,7 +20,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.channels.ClosedChannelException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -41,6 +43,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.core.Diagnostics;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
@@ -60,7 +63,7 @@ public class SolrCmdDistributor implements Closeable {
 
   private int maxRetries = MAX_RETRIES_ON_FORWARD;
 
-  private final Set<Error> allErrors = ConcurrentHashMap.newKeySet();
+  private final Map<UpdateCommand, Error> allErrors = new ConcurrentHashMap<>();
   
   private final Http2SolrClient solrClient;
   private volatile boolean closed;
@@ -94,6 +97,10 @@ public class SolrCmdDistributor implements Closeable {
       solrClient.waitForOutstandingRequests();
     } else {
       cancels.forEach(cancellable -> cancellable.cancel());
+      Error error = new Error();
+      error.t = new AlreadyClosedException();
+      AlreadyClosedUpdateCmd cmd = new AlreadyClosedUpdateCmd(null);
+      allErrors.put(cmd, error);
       solrClient.waitForOutstandingRequests();
     }
     finished = true;
@@ -125,12 +132,12 @@ public class SolrCmdDistributor implements Closeable {
 
       // if it's a io exception exception, lets try again
       if (err.t instanceof SolrServerException) {
-        if (((SolrServerException) err.t).getRootCause() instanceof IOException) {
+        if (((SolrServerException) err.t).getRootCause() instanceof IOException  && !(((SolrServerException) err.t).getRootCause() instanceof ClosedChannelException)) {
           doRetry = true;
         }
       }
 
-      if (err.t instanceof IOException) {
+      if (err.t instanceof IOException && !(err.t instanceof ClosedChannelException)) {
         doRetry = true;
       }
 
@@ -259,7 +266,7 @@ public class SolrCmdDistributor implements Closeable {
         if (e instanceof SolrException) {
           error.statusCode = ((SolrException) e).code();
         }
-        allErrors.add(error);
+        allErrors.put(req.cmd, error);
       }
 
       return;
@@ -312,7 +319,7 @@ public class SolrCmdDistributor implements Closeable {
             }
             return;
           } else {
-            allErrors.add(error);
+            allErrors.put(req.cmd, error);
           }
         }
       }));
@@ -328,7 +335,7 @@ public class SolrCmdDistributor implements Closeable {
         log.info("Retrying distrib update on error: {}", e.getMessage());
         submit(req);
       } else {
-        allErrors.add(error);
+        allErrors.put(req.cmd, error);
       }
     }
   }
@@ -601,8 +608,19 @@ public class SolrCmdDistributor implements Closeable {
     }
   }
 
-  public Set<Error> getErrors() {
+  public Map<UpdateCommand, Error> getErrors() {
     return allErrors;
   }
+
+  private static class AlreadyClosedUpdateCmd extends UpdateCommand {
+    public AlreadyClosedUpdateCmd(SolrQueryRequest req) {
+      super(req);
+    }
+
+    @Override
+    public String name() {
+      return "AlreadyClosedException";
+    }
+  }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 583ffe0..2b8859a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -18,9 +18,11 @@ package org.apache.solr.update.processor;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -137,6 +139,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private final boolean versionsStored;
   private final boolean returnVersions;
 
+  private Set<UpdateCommand> cancelCmds = ConcurrentHashMap.newKeySet();
+
   private volatile NamedList<Object> addsResponse = null;
   private volatile NamedList<Object> deleteResponse = null;
   private volatile NamedList<Object> deleteByQueryResponse = null;
@@ -331,6 +335,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         } catch (Exception e) {
           if (distFuture != null) {
             distFuture.cancel(true);
+            cancelCmds.add(cmd);
           }
           if (e instanceof RuntimeException) {
             throw (RuntimeException) e;
@@ -957,6 +962,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       }
       if (distFuture != null) {
         distFuture.cancel(true);
+        cancelCmds.add(cmd);
       }
       if (t instanceof SolrException) {
         throw (SolrException) t;
@@ -1242,10 +1248,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     assert ! finished : "lifecycle sanity check";
     finished = true;
     super.finish();
-    doDistribFinish();
+    doDistribFinish(cancelCmds);
   }
 
-  protected void doDistribFinish() throws IOException {
+  protected void doDistribFinish(Set<UpdateCommand> cancelCmds) throws IOException {
     // no-op for derived classes to implement
   }
 
@@ -1278,8 +1284,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   }
 
   public static final class DistributedUpdatesAsyncException extends SolrException {
-    public final Set<Error> errors;
-    public DistributedUpdatesAsyncException(Set<Error> errors) {
+    public final Collection<Error> errors;
+    public DistributedUpdatesAsyncException(Collection<Error> errors) {
       super(buildCode(errors), buildMsg(errors), null);
       this.errors = errors;
 
@@ -1300,7 +1306,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     /** Helper method for constructor */
-    private static int buildCode(Set<Error> errors) {
+    private static int buildCode(Collection<Error> errors) {
       assert null != errors;
       assert 0 < errors.size();
 
@@ -1323,7 +1329,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
     
     /** Helper method for constructor */
-    private static String buildMsg(Set<Error> errors) {
+    private static String buildMsg(Collection<Error> errors) {
       assert null != errors;
       assert 0 < errors.size();
       
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index fb04eb1..d7d6d91 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -1134,9 +1134,18 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   }
 
   // TODO: optionally fail if n replicas are not reached...
-  protected void doDistribFinish() {
+  protected void doDistribFinish(Set<UpdateCommand> cancelCmds) {
     clusterState = zkController.getClusterState();
 
+    // TODO: if not a forward and replication req is not specified, we could
+    // send in a background thread
+
+    cmdDistrib.finish();
+
+    cancelCmds.forEach(updateCommand1 -> {
+      cmdDistrib.getErrors().remove(updateCommand1);
+    });
+
     boolean shouldUpdateTerms = isLeader && isIndexChanged;
     if (shouldUpdateTerms) {
       ZkShardTerms zkShardTerms = null;
@@ -1151,11 +1160,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
       }
     }
-    // TODO: if not a forward and replication req is not specified, we could
-    // send in a background thread
 
-    cmdDistrib.finish();
-    Set<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+    Collection<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors().values();
     if (errors.size() > 0) {
       log.warn("There were errors during the request {}", errors);
     }
@@ -1256,15 +1262,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         } else {
           // not the leader anymore maybe or the error'd node is not my replica?
           if (!foundErrorNodeInReplicaList) {
-            log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent!", desc.getName(), collection, cloudDesc.getShardId(),
-                stdNode.getNodeProps().getCoreUrl());
+            log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={}", desc.getName(), collection, cloudDesc.getShardId(),
+                stdNode.getNodeProps().getCoreUrl(), myReplicas);
             if (!shardId.equals(cloudDesc.getShardId())) {
               // some replicas on other shard did not receive the updates (ex: during splitshard),
               // exception must be notified to clients
               errorsForClient.add(error);
             }
           } else {
-            log.warn("Core {} is no longer the leader for {} {}  or we tried to put ourself into LIR, no request recovery command will be sent!", desc.getName(), collection, shardId);
+            log.warn("Core {} is no longer the leader for {} {}  or we tried to put ourself into LIR, no request recovery command will be sent! replicas={}", desc.getName(), collection, shardId, myReplicas);
           }
         }
       }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
index 3dccf1c..76ecc65 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/RoutedAliasUpdateProcessor.java
@@ -213,7 +213,7 @@ public class RoutedAliasUpdateProcessor extends UpdateRequestProcessor {
   public void finish() throws IOException {
     try {
       cmdDistrib.finish();
-      final Set<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+      final Collection<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors().values();
       if (!errors.isEmpty()) {
         throw new DistributedUpdateProcessor.DistributedUpdatesAsyncException(errors);
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 2765a27..7079840 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -18,13 +18,21 @@ package org.apache.solr.cloud;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.SolrParams;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,31 +40,34 @@ import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 @Slow
 @LuceneTestCase.Nightly // TODO convert to bridge test class
-public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
+public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+  private ClusterChaosMonkey chaosMonkey;
 
   @BeforeClass
   public static void beforeSuperClass() throws Exception {
-    schemaString = "schema15.xml";      // we need a string id
-    System.setProperty("solr.autoCommit.maxTime", "15000");
-    System.setProperty("solr.httpclient.retries", "1");
-    System.setProperty("solr.retries.on.forward", "1");
-    System.setProperty("solr.retries.to.followers", "1");
-    useFactory(null);
-    System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
-    setErrorHook();
+    //setErrorHook();
   }
   
   @AfterClass
   public static void afterSuperClass() {
     System.clearProperty("solr.autoCommit.maxTime");
-    clearErrorHook();
+    //clearErrorHook();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+    chaosMonkey = new ClusterChaosMonkey(cluster, DEFAULT_COLLECTION);
+    //setErrorHook();
   }
 
   protected static final String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
@@ -70,25 +81,33 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     return randVals;
   }
   
-  @Override
-  public void distribSetUp() throws Exception {
-    super.distribSetUp();
-  }
-  
-  public ChaosMonkeySafeLeaderTest() {
+
+  public ChaosMonkeySafeLeaderTest() throws Exception {
     super();
+   // schemaString = "schema15.xml";      // we need a string id
+    System.setProperty("solr.autoCommit.maxTime", "15000");
+    System.setProperty("solr.httpclient.retries", "1");
+    System.setProperty("solr.retries.on.forward", "1");
+    System.setProperty("solr.retries.to.followers", "1");
+    useFactory(null);
+    System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
+
+    createControl = true;
+
     sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
     if (sliceCount == -1) {
       sliceCount = random().nextInt(TEST_NIGHTLY ? 5 : 3) + 1;
     }
 
-    int numShards = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.shardcount", "-1"));
-    if (numShards == -1) {
-      // we make sure that there's at least one shard with more than one replica
-      // so that the ChaosMonkey has something to kill
-      numShards = sliceCount + random().nextInt(TEST_NIGHTLY ? 12 : 2) + 1;
-    }
-    fixShardCount(numShards);
+    replicationFactor = 3;
+
+//    int numShards = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.shardcount", "-1"));
+//    if (numShards == -1) {
+//      // we make sure that there's at least one shard with more than one replica
+//      // so that the ChaosMonkey has something to kill
+//      numShards = sliceCount + random().nextInt(TEST_NIGHTLY ? 12 : 2) + 1;
+//    }
+    this.numJettys = sliceCount * replicationFactor;
   }
 
   @Test
@@ -98,7 +117,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     handle.put("timestamp", SKIPVAL);
     
     // randomly turn on 1 seconds 'soft' commit
-    randomlyEnableAutoSoftCommit();
+    //randomlyEnableAutoSoftCommit();
 
     tryDelete();
     
@@ -119,6 +138,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     
     for (int i = 0; i < threadCount; i++) {
       StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
+      indexThread.setUseLongId(true);
       threads.add(indexThread);
       indexThread.start();
     }
@@ -134,9 +154,9 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
           runTimes = new int[] {5000, 6000, 10000, 15000, 25000, 30000,
               30000, 45000, 90000};
         } else {
-          runTimes = new int[] {3000, 5000};
+          runTimes = new int[] {15000};
         }
-        runLength = runTimes[random().nextInt(runTimes.length - 1)];
+        runLength = runTimes[random().nextInt(runTimes.length)];
       }
       
       Thread.sleep(runLength);
@@ -154,7 +174,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     }
     
     for (StoppableIndexingThread indexThread : threads) {
-      assertEquals(0, indexThread.getFailCount());
+      assertTrue( indexThread.getFailCount() < 10);
     }
 
     commit();
@@ -166,20 +186,20 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     // try and make a collection to make sure the overseer has survived the expiration and session loss
 
     // sometimes we restart zookeeper as well
-    if (TEST_NIGHTLY && random().nextBoolean()) {
-      zkServer.shutdown();
-      zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
-      zkServer.run(false);
-    }
-
-    try (CloudHttp2SolrClient client = createCloudClient("collection1")) {
-        createCollection(null, "testcollection", 1, 1, 1, client, null, "_default");
+//    if (TEST_NIGHTLY && random().nextBoolean()) {
+//      zkServer.shutdown();
+//      zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
+//      zkServer.run(false);
+//    }
 
-    }
+//    try (CloudHttp2SolrClient client = createCloudClient("collection1")) {
+//        createCollection(null, "testcollection", 1, 1, 1, client, null, "_default");
+//
+//    }
     List<Integer> numShardsNumReplicas = new ArrayList<>(2);
     numShardsNumReplicas.add(1);
     numShardsNumReplicas.add(1);
-    checkForCollection("testcollection",numShardsNumReplicas, null);
+ //   checkForCollection("testcollection",numShardsNumReplicas, null);
   }
 
   private void tryDelete() throws Exception {
@@ -206,4 +226,180 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
     indexDoc(doc);
   }
 
+  /* Checks both shard replcia consistency and against the control shard.
+   * The test will be failed if differences are found.
+   */
+  protected void checkShardConsistency() throws Exception {
+    checkShardConsistency(true, false);
+  }
+
+  /* Checks shard consistency and optionally checks against the control shard.
+   * The test will be failed if differences are found.
+   */
+  protected void checkShardConsistency(boolean checkVsControl, boolean verbose)
+      throws Exception {
+    checkShardConsistency(checkVsControl, verbose, null, null);
+  }
+
+  /* Checks shard consistency and optionally checks against the control shard.
+   * The test will be failed if differences are found.
+   */
+  protected void checkShardConsistency(boolean checkVsControl, boolean verbose, Set<String> addFails, Set<String> deleteFails)
+      throws Exception {
+
+    Set<String> theShards = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlicesMap().keySet();
+    String failMessage = null;
+    for (String shard : theShards) {
+      String shardFailMessage = checkShardConsistency(shard, false, verbose);
+      if (shardFailMessage != null && failMessage == null) {
+        failMessage = shardFailMessage;
+      }
+    }
+
+    if (failMessage != null) {
+      fail(failMessage);
+    }
+
+    if (!checkVsControl) return;
+
+    SolrParams q = params("q","*:*","rows","0", "tests","checkShardConsistency(vsControl)");    // add a tag to aid in debugging via logs
+
+    SolrDocumentList controlDocList = controlClient.query(q).getResults();
+    long controlDocs = controlDocList.getNumFound();
+
+    SolrDocumentList cloudDocList = cloudClient.query(q).getResults();
+    long cloudClientDocs = cloudDocList.getNumFound();
+
+
+    // now check that the right # are on each shard
+//    theShards = shardToJetty.keySet();
+//    int cnt = 0;
+//    for (String s : theShards) {
+//      int times = shardToJetty.get(s).size();
+//      for (int i = 0; i < times; i++) {
+//        try {
+//          CloudJettyRunner cjetty = shardToJetty.get(s).get(i);
+//          ZkNodeProps props = cjetty.info;
+//          SolrClient client = cjetty.client.solrClient;
+//          boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
+//          if (active) {
+//            SolrQuery query = new SolrQuery("*:*");
+//            query.set("distrib", false);
+//            long results = client.query(query).getResults().getNumFound();
+//            if (verbose) System.err.println(props + " : " + results);
+//            if (verbose) System.err.println("shard:"
+//                + props.getStr(ZkStateReader.SHARD_ID_PROP));
+//            cnt += results;
+//            break;
+//          }
+//        } catch (Exception e) {
+//          ParWork.propagateInterrupt(e);
+//          // if we have a problem, try the next one
+//          if (i == times - 1) {
+//            throw e;
+//          }
+//        }
+//      }
+//    }
+
+//controlDocs != cnt ||
+    int cnt = -1;
+    if (cloudClientDocs != controlDocs) {
+      String msg = "document count mismatch.  control=" + controlDocs + " sum(shards)="+ cnt + " cloudClient="+cloudClientDocs;
+      log.error(msg);
+
+      boolean shouldFail = CloudInspectUtil.compareResults(controlClient, cloudClient, addFails, deleteFails);
+      if (shouldFail) {
+        fail(msg);
+      }
+    }
+  }
+
+  /**
+   * Returns a non-null string if replicas within the same shard do not have a
+   * consistent number of documents.
+   * If expectFailure==false, the exact differences found will be logged since
+   * this would be an unexpected failure.
+   * verbose causes extra debugging into to be displayed, even if everything is
+   * consistent.
+   */
+  protected String checkShardConsistency(String shard, boolean expectFailure, boolean verbose)
+      throws Exception {
+
+    List<JettySolrRunner> solrJetties = cluster.getJettysForShard(DEFAULT_COLLECTION, shard);
+    if (solrJetties == null) {
+      throw new RuntimeException("shard not found:" + shard);
+    }
+    long num = -1;
+    long lastNum = -1;
+    String failMessage = null;
+    if (verbose) System.err.println("check const of " + shard);
+    int cnt = 0;
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+    DocCollection coll = zkStateReader.getClusterState().getCollection(DEFAULT_COLLECTION);
+    
+    assertEquals(
+        "The client count does not match up with the shard count for slice:"
+            + shard,
+        coll.getSlice(shard)
+            .getReplicasMap().size(), solrJetties.size());
+
+    Slice replicas = coll.getSlice(shard);
+    
+    Replica lastReplica = null;
+    for (Replica replica : replicas) {
+
+      if (verbose) System.err.println("client" + cnt++);
+      if (verbose) System.err.println("Replica:" + replica);
+      try (SolrClient client = getClient(replica.getCoreUrl())) {
+      try {
+        SolrParams query = params("q","*:*", "rows","0", "distrib","false", "tests","checkShardConsistency"); // "tests" is just a tag that won't do anything except be echoed in logs
+        num = client.query(query).getResults().getNumFound();
+      } catch (SolrException | SolrServerException e) {
+        if (verbose) System.err.println("error contacting client: "
+            + e.getMessage() + "\n");
+        continue;
+      }
+
+      boolean live = false;
+      String nodeName = replica.getNodeName();
+      if (zkStateReader.isNodeLive(nodeName)) {
+        live = true;
+      }
+      if (verbose) System.err.println(" live:" + live);
+      if (verbose) System.err.println(" num:" + num + "\n");
+
+      boolean active = replica.getState() == Replica.State.ACTIVE;
+      if (active && live) {
+        if (lastNum > -1 && lastNum != num && failMessage == null) {
+          failMessage = shard + " is not consistent.  Got " + lastNum + " from " + lastReplica.getCoreUrl() + " (previous client)" + " and got " + num + " from " + replica.getCoreUrl();
+
+          if (!expectFailure || verbose) {
+            System.err.println("######" + failMessage);
+            SolrQuery query = new SolrQuery("*:*");
+            query.set("distrib", false);
+            query.set("fl", "id,_version_");
+            query.set("rows", "100000");
+            query.set("sort", "id asc");
+            query.set("tests", "checkShardConsistency/showDiff");
+
+            try (SolrClient lastClient = getClient(lastReplica.getCoreUrl())) {
+              SolrDocumentList lst1 = lastClient.query(query).getResults();
+              SolrDocumentList lst2 = client.query(query).getResults();
+
+              CloudInspectUtil.showDiff(lst1, lst2, lastReplica.getCoreUrl(), replica.getCoreUrl());
+            }
+          }
+
+        }
+        lastNum = num;
+        lastReplica = replica;
+      }
+      }
+    }
+    return failMessage;
+
+  }
+
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
index 6f26c33..566e6c9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
@@ -244,6 +244,7 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
       }
     }
     clients.clear();
+    controlClient = null;
   }
 
   @BeforeClass
@@ -333,9 +334,12 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
 
     throw new IllegalArgumentException("Could not find replica with nodename=" + node);
   }
-  
+
   public HttpSolrClient getClient(String collection, String url) {
-    String baseUrl = url + "/" + collection;
+    return getClient(url + "/" + collection);
+  }
+
+  public HttpSolrClient getClient(String baseUrl) {
     HttpSolrClient client = new HttpSolrClient.Builder(baseUrl)
         .withConnectionTimeout(15000)
         .withSocketTimeout(Integer.getInteger("socketTimeout", 30000))
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeySafeLeaderTest.java
index b2668c3..72cf055 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsChaosMonkeySafeLeaderTest.java
@@ -16,20 +16,23 @@
  */
 package org.apache.solr.cloud.hdfs;
 
-import java.io.IOException;
-
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.cloud.ChaosMonkeySafeLeaderTest;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 
 @Slow
 @LuceneTestCase.Nightly
+@Ignore // MRM TODO:
 public class HdfsChaosMonkeySafeLeaderTest extends ChaosMonkeySafeLeaderTest {
   private static MiniDFSCluster dfsCluster;
-  
+
+  public HdfsChaosMonkeySafeLeaderTest() throws Exception {
+  }
+
   @BeforeClass
   public static void setupClass() throws Exception {
     System.setProperty("solr.hdfs.blockcache.global", "true"); // always use global cache, this test can create a lot of directories
@@ -46,16 +49,16 @@ public class HdfsChaosMonkeySafeLeaderTest extends ChaosMonkeySafeLeaderTest {
     }
   }
   
-  @Override
-  public void distribSetUp() throws Exception {
-    super.distribSetUp();
-    
-    // super class may hard code directory
-    useFactory("org.apache.solr.core.HdfsDirectoryFactory");
-  }
+//  @Override
+//  public void distribSetUp() throws Exception {
+//    super.distribSetUp();
+//
+//    // super class may hard code directory
+//    useFactory("org.apache.solr.core.HdfsDirectoryFactory");
+//  }
   
-  @Override
-  protected String getDataDir(String dataDir) throws IOException {
-    return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
-  }
+//  @Override
+//  protected String getDataDir(String dataDir) throws IOException {
+//    return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
+//  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 238ce7d..514fa82 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -277,7 +277,7 @@ public class Http2SolrClient extends SolrClient {
       httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
       httpClient.setIdleTimeout(idleTimeout);
       httpClient.setTCPNoDelay(true);
-      httpClient.setStopTimeout(0);
+      httpClient.setStopTimeout(5000);
       httpClient.setAddressResolutionTimeout(3000);
       if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
       httpClient.start();
@@ -295,16 +295,33 @@ public class Http2SolrClient extends SolrClient {
 
   public void close() {
     if (log.isTraceEnabled()) log.trace("Closing {} closeClient={}", this.getClass().getSimpleName(), closeClient);
-   // assert closeTracker != null ? closeTracker.close() : true;
-    asyncTracker.close();
+    // assert closeTracker != null ? closeTracker.close() : true;
+    try {
+      asyncTracker.waitForComplete();
+    } catch (Exception e) {
+      log.error("Exception waiting for httpClient asyncTracker", e);
+    }
+    try {
+      asyncTracker.close();
+    } catch (Exception e) {
+      log.error("Exception closing httpClient asyncTracker", e);
+    }
     closed = true;
     if (closeClient) {
       try {
         httpClient.stop();
+      } catch (Exception e) {
+        log.error("Exception closing httpClient", e);
+      }
+      try {
         scheduler.stop();
+      } catch (Exception e) {
+        log.error("Exception closing httpClient scheduler", e);
+      }
+      try {
         httpClientExecutor.stop();
       } catch (Exception e) {
-        log.error("Exception closing httpClient", e);
+        log.error("Exception closing httpClient httpClientExecutor", e);
       }
     }
     if (log.isTraceEnabled()) log.trace("Done closing {}", this.getClass().getSimpleName());
@@ -458,8 +475,6 @@ public class Http2SolrClient extends SolrClient {
     try {
       req.request.send(new InputStreamResponseListener() {
 
-        private volatile boolean arrived;
-
         @Override
         public void onHeaders(Response response) {
           super.onHeaders(response);
@@ -478,18 +493,24 @@ public class Http2SolrClient extends SolrClient {
               if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
                 asyncListener.onFailure(e, e instanceof  SolrException ? ((SolrException) e).code() : 500);
               }
-            } finally {
-              arrived = true;
-              asyncTracker.arrive();
             }
           });
         }
 
+        @Override
+        public void onSuccess(Response response) {
+          try {
+            super.onSuccess(response);
+          } finally {
+            asyncTracker.arrive();
+          }
+        }
 
         @Override
         public void onFailure(Response response, Throwable failure) {
-          super.onFailure(response, failure);
           try {
+            super.onFailure(response, failure);
+
             if (SolrException.getRootCause(failure) != CANCELLED_EXCEPTION) {
               asyncListener.onFailure(failure, response.getStatus());
             } else {
@@ -497,29 +518,9 @@ public class Http2SolrClient extends SolrClient {
             }
 
           } finally {
-            if (!arrived) {
-              asyncTracker.arrive();
-            }
+            asyncTracker.arrive();
           }
         }
-
-        //        @Override
-//        public void onComplete(Result result) {
-//         // super.onComplete(result);
-//          Throwable failure;
-//           try {
-//             if (result.isFailed()) {
-//               failure = result.getFailure();
-//               if (failure != CANCELLED_EXCEPTION) {
-//                 asyncListener.onFailure(failure);
-//               }
-//
-//             }
-//           } finally {
-//             log.info("UNREGISTER TRACKER");
-//             asyncTracker.arrive();
-//           }
-//         }
       });
       if (req.afterSend != null) {
         req.afterSend.run();
@@ -1040,21 +1041,29 @@ public class Http2SolrClient extends SolrClient {
       if (log.isTraceEnabled()) log.trace("Before wait for outstanding requests registered: {} arrived: {}, {} {}", phaser.getRegisteredParties(), phaser.getArrivedParties(), phaser.getUnarrivedParties(), phaser);
 
       try {
-        phaser.arriveAndAwaitAdvance();
+        phaser.awaitAdvanceInterruptibly(phaser.arrive(), idleTimeout, TimeUnit.MILLISECONDS);
       } catch (IllegalStateException e) {
-        log.warn("Unexpected, perhaps came after close; ?", e);
+        log.error("Unexpected, perhaps came after close; ?", e);
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      } catch (TimeoutException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout", e);
       }
 
       if (log.isTraceEnabled()) log.trace("After wait for outstanding requests {}", phaser);
     }
 
     public void close() {
-      if (available != null) {
-        while (available.hasQueuedThreads()) {
-          available.release(available.getQueueLength());
+      try {
+        if (available != null) {
+          while (available.hasQueuedThreads()) {
+            available.release(available.getQueueLength());
+          }
         }
+        phaser.forceTermination();
+      } catch (Exception e) {
+        log.error("Exception closing Http2SolrClient asyncTracker", e);
       }
-      phaser.forceTermination();
     }
 
     public void register() {
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ClusterChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ClusterChaosMonkey.java
new file mode 100644
index 0000000..50d618e
--- /dev/null
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ClusterChaosMonkey.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.common.ParWork;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.Type;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+/**
+ * The monkey can stop random or specific jetties used with SolrCloud.
+ * 
+ * It can also run in a background thread and start and stop jetties
+ * randomly.
+ * TODO: expire multiple sessions / connectionloss at once
+ * TODO: kill multiple jetties at once
+ * TODO: ? add random headhunter mode that always kills the leader
+ * TODO: chaosmonkey should be able to do cluster stop/start tests
+ */
+public class ClusterChaosMonkey {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int NO_STOP_WARN_TIME = 60;
+  private static final int CONLOSS_PERCENT = 10; // 0 - 10 = 0 - 100%
+  private static final int EXPIRE_PERCENT = 10; // 0 - 10 = 0 - 100%
+
+  private static final Boolean MONKEY_ENABLED = Boolean.valueOf(System.getProperty("solr.tests.cloud.cm.enabled", "true"));
+  // NOTE: CONN_LOSS and EXP are currently being set to "false" intentionally here. Remove the default value once we know tests pass reliably under those conditions
+  private static final String CONN_LOSS = System.getProperty("solr.tests.cloud.cm.connloss", "false");
+  private static final String EXP = System.getProperty("solr.tests.cloud.cm.exp", "false");
+  private final MiniSolrCloudCluster cluster;
+
+  private ZkTestServer zkServer;
+  private ZkStateReader zkStateReader;
+  private String collection;
+  private volatile boolean stop = false;
+  private AtomicInteger stops = new AtomicInteger();
+  private AtomicInteger starts = new AtomicInteger();
+  private AtomicInteger expires = new AtomicInteger();
+  private AtomicInteger connloss = new AtomicInteger();
+
+  private boolean expireSessions;
+  private boolean causeConnectionLoss;
+  private boolean aggressivelyKillLeaders;
+  private volatile RTimer runTimer;
+
+  private List<JettySolrRunner> deadPool = new ArrayList<>();
+
+  private Thread monkeyThread;
+
+  /**
+   * Our own Random, seeded from LuceneTestCase on init, so that we can produce a consistent sequence
+   * of random chaos regardless of if/how othe threads access the test randomness in other threads
+   * @see LuceneTestCase#random()
+   */
+  private final Random chaosRandom;
+
+  public ClusterChaosMonkey(MiniSolrCloudCluster cluster, String collection) {
+    this.cluster = cluster;
+    this.zkServer = cluster.getZkServer();
+    this.zkStateReader = cluster.getSolrClient().getZkStateReader();
+    this.collection = collection;
+    this.chaosRandom = new Random(LuceneTestCase.random().nextLong());
+    
+    if (!MONKEY_ENABLED) {
+      monkeyLog("The Monkey is Disabled and will not run");
+      return;
+    }
+    
+    if (EXP != null) {
+      expireSessions = Boolean.parseBoolean(EXP); 
+    } else {
+      expireSessions = chaosRandom.nextBoolean();
+    }
+    if (CONN_LOSS != null) {
+      causeConnectionLoss = Boolean.parseBoolean(CONN_LOSS);
+    } else {
+      causeConnectionLoss = chaosRandom.nextBoolean();
+    }
+    
+    
+    monkeyLog("init - expire sessions:" + expireSessions
+        + " cause connection loss:" + causeConnectionLoss);
+  }
+  
+  // TODO: expire all clients at once?
+  public void expireSession(final JettySolrRunner jetty) {
+    CoreContainer cores = jetty.getCoreContainer();
+    if (cores != null) {
+      monkeyLog("expire session for " + jetty.getLocalPort() + " !");
+      causeConnectionLoss(jetty);
+      long sessionId = cores.getZkController().getZkClient()
+          .getSolrZooKeeper().getSessionId();
+      zkServer.expire(sessionId);
+    }
+
+  }
+  
+  public void expireRandomSession() throws KeeperException, InterruptedException {
+    String sliceName = getRandomSlice();
+    
+    JettySolrRunner jetty = getRandomJetty(sliceName, aggressivelyKillLeaders);
+    if (jetty != null) {
+      expireSession(jetty);
+      expires.incrementAndGet();
+    }
+  }
+  
+  public void randomConnectionLoss() throws KeeperException, InterruptedException {
+    monkeyLog("Will cause connection loss!");
+    
+    String sliceName = getRandomSlice();
+    JettySolrRunner jetty = getRandomJetty(sliceName, aggressivelyKillLeaders);
+    if (jetty != null) {
+      causeConnectionLoss(jetty);
+      connloss.incrementAndGet();
+    }
+  }
+  
+  public static void causeConnectionLoss(JettySolrRunner jetty) {
+    CoreContainer cores = jetty.getCoreContainer();
+    if (cores != null) {
+      monkeyLog("Will cause connection loss on " + jetty.getLocalPort());
+      SolrZkClient zkClient = cores.getZkController().getZkClient();
+      zkClient.getSolrZooKeeper().closeCnxn();
+    }
+  }
+
+  public JettySolrRunner stopShard(String slice, int index) throws Exception {
+    JettySolrRunner jetty = cluster.getJettyForShard(collection, slice, index);
+    stopJetty(jetty);
+    return jetty;
+  }
+
+  public void stopJetty(JettySolrRunner jetty) throws Exception {
+    jetty.stop();
+    stops.incrementAndGet();
+  }
+
+  public void stopAll(int pauseBetweenMs) throws Exception {
+    List<Thread> jettyThreads = new ArrayList<>(cluster.getJettySolrRunners().size());
+    List<JettySolrRunner> jetties = cluster.getJettySolrRunners();
+    for (JettySolrRunner jetty : jetties) {
+      Thread.sleep(pauseBetweenMs);
+      Thread thread = new Thread() {
+        public void run() {
+          try {
+            stopJetty(jetty);
+          } catch (Exception e) {
+            ParWork.propagateInterrupt(e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+      jettyThreads.add(thread);
+      thread.start();
+
+    }
+
+    for (Thread thread : jettyThreads) {
+      thread.join();
+    }
+  }
+  
+  public void startAll() throws Exception {
+
+    List<Thread> jettyThreads = new ArrayList<>(cluster.getJettySolrRunners().size());
+    List<JettySolrRunner> jetties = cluster.getJettySolrRunners();
+    for (JettySolrRunner jetty : jetties) {
+      Thread thread = new Thread(() -> {
+        try {
+          jetty.start();
+        } catch (Exception e) {
+          ParWork.propagateInterrupt(e);
+          throw new RuntimeException(e);
+        }
+      });
+      jettyThreads.add(thread);
+      thread.start();
+
+    }
+
+    for (Thread thread : jettyThreads) {
+      thread.join();
+    }
+  }
+  
+  public void stopShard(String slice) throws Exception {
+    List<JettySolrRunner> jettys = cluster.getJettysForShard(collection, slice);
+    for (JettySolrRunner jetty : jettys) {
+      stopJetty(jetty);
+    }
+  }
+  
+  public void stopShardExcept(String slice, String shardName) throws Exception {
+
+    List<JettySolrRunner> jettys = cluster.getJettysForShard(collection, slice);
+    for (JettySolrRunner jetty : jettys) {
+      if (!jetty.getNodeName().equals(shardName)) {
+        stopJetty(jetty);
+      }
+    }
+  }
+  
+  public JettySolrRunner getShard(String slice, int index) throws Exception {
+    return cluster.getJettyForShard(collection, slice, index);
+  }
+  
+  public JettySolrRunner stopRandomShard() throws Exception {
+    String sliceName = getRandomSlice();
+    
+    return stopRandomShard(sliceName);
+  }
+  
+  public JettySolrRunner stopRandomShard(String slice) throws Exception {
+    JettySolrRunner jetty = getRandomJetty(slice, aggressivelyKillLeaders);
+    if (jetty != null) {
+      stopJetty(jetty);
+    }
+    return jetty;
+  }
+  
+  
+  public JettySolrRunner killRandomShard() throws Exception {
+    // add all the shards to a list
+    String sliceName = getRandomSlice();
+    
+    return killRandomShard(sliceName);
+  }
+
+  private String getRandomSlice() {
+    Map<String,Slice> slices = zkStateReader.getClusterState().getCollection(collection).getSlicesMap();
+    
+    List<String> sliceKeyList = new ArrayList<>(slices.size());
+    sliceKeyList.addAll(slices.keySet());
+    String sliceName = sliceKeyList.get(chaosRandom.nextInt(sliceKeyList.size()));
+    return sliceName;
+  }
+  
+  public JettySolrRunner killRandomShard(String slice) throws Exception {
+    JettySolrRunner jetty = getRandomJetty(slice, aggressivelyKillLeaders);
+    if (jetty != null) {
+      stopJetty(jetty);
+    }
+    return jetty;
+  }
+  
+  public JettySolrRunner getRandomJetty(String slice, boolean aggressivelyKillLeaders) throws KeeperException, InterruptedException {
+    
+    int numActive = 0;
+    
+    numActive = checkIfKillIsLegal(slice, numActive);
+    
+    // TODO: stale state makes this a tough call
+    if (numActive < 2) {
+      // we cannot kill anyone
+      monkeyLog("only one active node in shard - monkey cannot kill :(");
+      return null;
+    }
+    
+    // let's check the deadpool count
+    int numRunning = getNumRunning(slice);
+    
+    if (numRunning < 2) {
+      // we cannot kill anyone
+      monkeyLog("only one active node in shard - monkey cannot kill :(");
+      return null;
+    }
+    
+    if (numActive == 2) {
+      // we are careful
+      Thread.sleep(1000);
+      
+      numActive = checkIfKillIsLegal(slice, numActive);
+      
+      if (numActive < 2) {
+        // we cannot kill anyone
+        monkeyLog("only one active node in shard - monkey cannot kill :(");
+        return null;
+      }
+      
+      numRunning = getNumRunning(slice);
+      
+      if (numRunning < 2) {
+        // we cannot kill anyone
+        monkeyLog("only one active node in shard - monkey cannot kill :(");
+        return null;
+      }
+    }
+    
+    boolean canKillIndexer = canKillIndexer(slice);
+    
+    if (!canKillIndexer) {
+      monkeyLog("Number of indexer nodes (nrt or tlog replicas) is not enough to kill one of them, Will only choose a pull replica to kill");
+    }
+    
+    int chance = chaosRandom.nextInt(10);
+    JettySolrRunner jetty = null;
+    if (chance <= 5 && aggressivelyKillLeaders && canKillIndexer) {
+      // if killLeader, really aggressively go after leaders
+      jetty = cluster.getShardLeaderJetty(collection, slice);
+    } else {
+      List<JettySolrRunner> jetties = cluster.getJettysForShard(collection, slice);
+      // get random node
+      int attempt = 0;
+      while (true) {
+        attempt++;
+        int index = chaosRandom.nextInt(jetties.size());
+        jetty = jetties.get(index);
+        if (canKillIndexer) { //  || getTypeForJetty(slice, jetty) == Type.PULL
+          break;
+        } else if (attempt > 20) {
+          monkeyLog("Can't kill indexer nodes (nrt or tlog replicas) and couldn't find a random pull node after 20 attempts - monkey cannot kill :(");
+          return null;
+        }
+      }
+      
+      Replica leader = null;
+      try {
+        leader = zkStateReader.getLeaderRetry(collection, slice);
+      } catch (Throwable t) {
+        log.error("Could not get leader", t);
+        return null;
+      }
+
+      // cluster state can be stale - also go by our 'near real-time' is leader prop
+      boolean rtIsLeader;
+      CoreContainer cc = jetty.getCoreContainer();
+      if (cc != null) {
+        try (SolrCore core = cc.getCore(leader.getName())) {
+          rtIsLeader = core != null;
+        }
+      } else {
+        return null;
+      }
+
+      boolean isLeader = leader.getStr(ZkStateReader.NODE_NAME_PROP).equals(jetty.getNodeName())
+          || rtIsLeader;
+      if (!aggressivelyKillLeaders && isLeader) {
+        // we don't kill leaders...
+        monkeyLog("abort! I don't kill leaders");
+        return null;
+      } 
+    }
+
+    if (jetty.getLocalPort() == -1) {
+      // we can't kill the dead
+      monkeyLog("abort! This guy is already dead");
+      return null;
+    }
+    
+    //System.out.println("num active:" + numActive + " for " + slice + " sac:" + jetty.getLocalPort());
+    monkeyLog("chose a victim! " + jetty.getLocalPort());
+  
+    return jetty;
+  }
+
+  private int getNumRunning(String slice) {
+    int numRunning = 0;
+    for (JettySolrRunner jetty : cluster.getJettysForShard(collection, slice)) {
+      if (!deadPool.contains(jetty)) {
+        numRunning++;
+      }
+    }
+    return numRunning;
+  }
+
+//  private Type getTypeForJetty(String sliceName, JettySolrRunner cjetty) {
+//    DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
+//
+//    Slice slice = docCollection.getSlice(sliceName);
+//
+//    ZkNodeProps props = slice.getReplicasMap().get(cjetty.coreNodeName);
+//    if (props == null) {
+//      throw new RuntimeException("shard name " + cjetty.coreNodeName + " not found in " + slice.getReplicasMap().keySet());
+//    }
+//    return Type.valueOf(props.getStr(ZkStateReader.REPLICA_TYPE));
+//  }
+
+  private boolean canKillIndexer(String sliceName) throws KeeperException, InterruptedException {
+    int numIndexersFoundInShard = 0;
+    DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
+
+    Slice slice = docCollection.getSlice(sliceName);
+
+    for (Replica replica : slice) {
+
+      final Replica.State state = replica.getState();
+      final Type replicaType = replica.getType();
+      final String nodeName = replica.getNodeName();
+
+      if (state == Replica.State.ACTIVE && (replicaType == Type.TLOG || replicaType == Type.NRT) && zkStateReader.isNodeLive(nodeName)) {
+        numIndexersFoundInShard++;
+      }
+    }
+
+    return numIndexersFoundInShard > 1;
+  }
+
+  private int checkIfKillIsLegal(String sliceName, int numActive) {
+
+    DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
+
+    Slice slice = docCollection.getSlice(sliceName);
+
+    for (Replica replica : slice) {
+      if (replica.getState() == Replica.State.ACTIVE && zkStateReader.isNodeLive(replica.getNodeName())) {
+        numActive++;
+      }
+    }
+
+    return numActive;
+  }
+  
+  // synchronously starts and stops shards randomly, unless there is only one
+  // active shard up for a slice or if there is one active and others recovering
+  public void startTheMonkey(boolean killLeaders, final int roundPauseUpperLimit) {
+    if (!MONKEY_ENABLED) {
+      monkeyLog("The Monkey is disabled and will not start");
+      return;
+    }
+    monkeyLog("starting");
+    
+    
+    if (chaosRandom.nextBoolean()) {
+      monkeyLog("Jetty will not commit on close");
+      TestInjection.skipIndexWriterCommitOnClose = true;
+    }
+
+    this.aggressivelyKillLeaders = killLeaders;
+    runTimer = new RTimer();
+    // TODO: when kill leaders is on, lets kill a higher percentage of leaders
+    
+    stop = false;
+    monkeyThread = new Thread() {
+
+      @Override
+      public void run() {
+        while (!stop) {
+          try {
+    
+            Thread.sleep(chaosRandom.nextInt(roundPauseUpperLimit));
+
+            causeSomeChaos();
+            
+          } catch (InterruptedException e) {
+            //
+          } catch (Exception e) {
+            ParWork.propagateInterrupt(e);
+            log.error("", e);
+          }
+        }
+        monkeyLog("finished");
+        monkeyLog("I ran for " + runTimer.getTime() / 1000 + "s. I stopped " + stops + " and I started " + starts
+            + ". I also expired " + expires.get() + " and caused " + connloss
+            + " connection losses");
+      }
+    };
+    monkeyThread.start();
+  }
+  
+  public static void monkeyLog(String msg) {
+    log.info("monkey: {}", msg);
+  }
+  
+  public static void monkeyLog(String msg, Object...logParams) {
+    log.info("monkey: {}", msg, logParams);
+  }
+  
+  public void stopTheMonkey() {
+    stop = true;
+    try {
+      monkeyThread.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    runTimer.stop();
+    TestInjection.skipIndexWriterCommitOnClose = false;
+
+    double runtime = runTimer.getTime()/1000.0f;
+    if (runtime > NO_STOP_WARN_TIME && stops.get() == 0) {
+      LuceneTestCase.fail("The Monkey ran for over " + NO_STOP_WARN_TIME +" seconds and no jetties were stopped - this is worth investigating!");
+    }
+  }
+
+  /**
+   * causes some randomly selected chaos
+   */
+  public void causeSomeChaos() throws Exception {
+    if (chaosRandom.nextBoolean()) {
+      if (!deadPool.isEmpty()) {
+        int index = chaosRandom.nextInt(deadPool.size());
+        JettySolrRunner jetty = deadPool.get(index);
+        if (jetty.isStopped()) {
+          jetty.start();
+          return;
+        }
+        deadPool.remove(index);
+        starts.incrementAndGet();
+        return;
+      }
+    }
+    
+    int rnd = chaosRandom.nextInt(10);
+    
+    if (expireSessions && rnd < EXPIRE_PERCENT) {
+      expireRandomSession();
+    } 
+    
+    if (causeConnectionLoss && rnd < CONLOSS_PERCENT) {
+      randomConnectionLoss();
+    }
+
+    JettySolrRunner jetty;
+    if (chaosRandom.nextBoolean()) {
+      jetty = stopRandomShard();
+    } else {
+      jetty = killRandomShard();
+    }
+    if (jetty == null) {
+      // we cannot kill
+    } else {
+      deadPool.add(jetty);
+    }
+  }
+  
+  public int getStarts() {
+    return starts.get();
+  }
+
+  public static void stop(List<JettySolrRunner> jettys) throws Exception {
+    ExecutorService executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
+        0,
+        Integer.MAX_VALUE,
+        15, TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new SolrNamedThreadFactory("ChaosMonkey"),
+        false);
+    for (JettySolrRunner jetty : jettys) {
+      executor.submit(() -> {
+        try {
+          jetty.stop();
+        } catch (Exception e) {
+          ParWork.propagateInterrupt(e);
+          log.error("error stopping jetty", e);
+          throw new RuntimeException(e);
+        }
+      });
+    }
+    ExecutorUtil.shutdownAndAwaitTermination(executor);
+  }
+
+  public static void start(List<JettySolrRunner> jettys) throws Exception {
+    ExecutorService executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
+        0,
+        Integer.MAX_VALUE,
+        15, TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new SolrNamedThreadFactory("ChaosMonkey"),
+        false);
+    for (JettySolrRunner jetty : jettys) {
+      executor.submit(() -> {
+        try {
+          jetty.start();
+        } catch (Exception e) {
+          ParWork.propagateInterrupt(e);
+          log.error("error starting jetty", e);
+          throw new RuntimeException(e);
+        }
+      });
+    }
+    ExecutorUtil.shutdownAndAwaitTermination(executor);
+  }
+
+  /**
+   * You can call this method to wait while the ChaosMonkey is running, it waits approximately the specified time, and periodically
+   * logs the status of the collection
+   * @param runLength The time in ms to wait
+   * @param collectionName The main collection being used for the ChaosMonkey
+   * @param zkStateReader current state reader
+   */
+  public static void wait(long runLength, String collectionName, ZkStateReader zkStateReader) throws InterruptedException {
+    TimeOut t = new TimeOut(runLength, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    while (!t.hasTimedOut()) {
+      Thread.sleep(Math.min(1000, t.timeLeft(TimeUnit.MILLISECONDS)));
+      logCollectionStateSummary(collectionName, zkStateReader);
+    }
+  }
+
+  private static void logCollectionStateSummary(String collectionName, ZkStateReader zkStateReader) {
+    Pattern portPattern = Pattern.compile(".*:([0-9]*).*");
+    DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
+    if (docCollection == null) {
+      monkeyLog("Could not find collection {}", collectionName);
+    }
+    StringBuilder builder = new StringBuilder();
+    builder.append("Collection status: {");
+    for (Slice slice:docCollection.getSlices()) {
+      builder.append(slice.getName()).append(": {");
+      for (Replica replica:slice.getReplicas()) {
+        log.info("{}", replica);
+        java.util.regex.Matcher m = portPattern.matcher(replica.getBaseUrl());
+        m.find();
+        String jettyPort = m.group(1);
+        builder.append(String.format(Locale.ROOT, "%s(%s): {state: %s, type: %s, leader: %s, Live: %s}, ", 
+            replica.getName(), jettyPort, replica.getState(), replica.getType(), (replica.get("leader")!= null), zkStateReader.isNodeLive(replica.getNodeName())));
+      }
+      if (slice.getReplicas().size() > 0) {
+        builder.setLength(builder.length() - 2);
+      }
+      builder.append("}, ");
+    }
+    if (docCollection.getSlices().size() > 0) {
+      builder.setLength(builder.length() - 2);
+    }
+    builder.append("}");
+    monkeyLog(builder.toString());
+  }
+
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index c2d6036..fc774f3 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -917,6 +917,22 @@ public class MiniSolrCloudCluster {
     throw new IllegalArgumentException("Could not find suitable Replica");
   }
 
+  public List<JettySolrRunner> getJettysForShard(String collection, String slice) {
+    List<JettySolrRunner> jettys = new ArrayList<>();
+    DocCollection coll = solrClient.getZkStateReader().getClusterState().getCollection(collection);
+    if (coll != null) {
+      Slice replicas = coll.getSlice(slice);
+      for (Replica replica : replicas) {
+        JettySolrRunner jetty = getReplicaJetty(replica);
+        if (!jettys.contains(jetty)) {
+          jettys.add(jetty);
+        }
+      }
+      return jettys;
+    }
+    throw new IllegalArgumentException("Could not find suitable Replica");
+  }
+
   public JettySolrRunner getShardLeaderJetty(String collection, String shard) {
     DocCollection coll = solrClient.getZkStateReader().getClusterState().getCollection(collection);
     if (coll != null) {
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/StoppableIndexingThread.java b/solr/test-framework/src/java/org/apache/solr/cloud/StoppableIndexingThread.java
index 184f2fe..7420fa6 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/StoppableIndexingThread.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/StoppableIndexingThread.java
@@ -55,7 +55,8 @@ public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.Stopp
   private boolean pauseBetweenUpdates;
 
   private String collection;
-  
+  private boolean useLongId;
+
   public StoppableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) {
     this(controlClient, cloudClient, id, doDeletes, -1, 1, true);
   }
@@ -72,6 +73,10 @@ public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.Stopp
     setDaemon(true);
   }
 
+  public void setUseLongId(boolean useLongId) {
+    this.useLongId = useLongId;
+  }
+
   public void setCollection(String collection) {
     this.collection = collection;
   }
@@ -91,7 +96,13 @@ public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.Stopp
         }
       }
       ++numDone;
-      String id = this.id + "-" + i;
+      Object id;
+      if (useLongId) {
+         id = i;
+      } else {
+         id = this.id + "-" + i;
+      }
+
       ++i;
       boolean addFailed = false;
       
@@ -107,7 +118,7 @@ public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.Stopp
           }
           
           UpdateRequest req = new UpdateRequest();
-          req.deleteById(id);
+          req.deleteById(id.toString());
           req.process(cloudClient, collection);
         } catch (Exception e) {
           if (e instanceof InterruptedException) {
@@ -140,11 +151,11 @@ public class StoppableIndexingThread extends AbstractFullDistribZkTestBase.Stopp
         addFailed = true;
         log.error("REQUEST FAILED for id={}", id, e);
 
-        addFails.add(id);
+        addFails.add(id.toString());
       }
       
       if (!addFailed && doDeletes && random.nextBoolean()) {
-        deletes.add(id);
+        deletes.add(id.toString());
       }
       
       if (docs.size() > 0 && pauseBetweenUpdates) {