You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 16:07:22 UTC

[lucene-solr] branch reference_impl_dev updated: @1100 Harden.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new f02c615  @1100 Harden.
f02c615 is described below

commit f02c61538cbee3a4de1fc9fa195f98deb5663114
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Oct 31 11:06:47 2020 -0500

    @1100 Harden.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 104 ++-----------------
 .../apache/solr/cloud/OverseerTaskProcessor.java   |   8 +-
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |   3 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  |  20 +++-
 .../java/org/apache/solr/core/CoreContainer.java   |  18 +---
 .../src/java/org/apache/solr/core/CoreSorter.java  |   4 +
 .../src/java/org/apache/solr/core/SolrCore.java    |  12 ++-
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |   2 +-
 .../org/apache/solr/cloud/DeleteShardTest.java     |   4 -
 .../org/apache/solr/cloud/MoveReplicaTest.java     |   2 -
 .../test/org/apache/solr/cloud/OverseerTest.java   | 112 ---------------------
 .../org/apache/solr/cloud/TestPrepRecovery.java    |   6 +-
 .../org/apache/solr/request/SimpleFacetsTest.java  |   2 +
 .../apache/solr/schema/CurrencyFieldTypeTest.java  |   2 +
 .../org/apache/solr/util/OrderedExecutorTest.java  |  24 +++--
 .../org/apache/solr/common/cloud/SolrZkClient.java |   4 +-
 .../apache/solr/common/cloud/SolrZooKeeper.java    |  59 ++++++-----
 .../apache/solr/client/solrj/io/sql/JdbcTest.java  |   1 +
 .../java/org/apache/solr/cloud/ZkTestServer.java   |  12 +--
 19 files changed, 103 insertions(+), 296 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index c5582cd..45f4ebe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -200,17 +200,12 @@ public class Overseer implements SolrCloseable {
     private final String myId;
     //queue where everybody can throw tasks
     private final ZkDistributedQueue stateUpdateQueue;
-    //TODO remove in 9.0, we do not push message into this queue anymore
-    //Internal queue where overseer stores events that have not yet been published into cloudstate
-    //If Overseer dies while extracting the main queue a new overseer will start from this queue
-    private final ZkDistributedQueue workQueue;
 
     private volatile boolean isClosed = false;
 
     public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
       this.zkClient = reader.getZkClient();
       this.stateUpdateQueue = getStateUpdateQueue(zkStats);
-      this.workQueue = getInternalWorkQueue(zkClient, zkStats);
       this.myId = myId;
       this.reader = reader;
     }
@@ -226,7 +221,7 @@ public class Overseer implements SolrCloseable {
 
       log.info("Starting to work on the main queue : {}", LeaderElector.getNodeName(myId));
 
-        ZkStateWriter zkStateWriter = null;
+        ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);;
         try {
           reader.forciblyRefreshAllClusterStateSlow();
         } catch (KeeperException e) {
@@ -239,83 +234,15 @@ public class Overseer implements SolrCloseable {
         ClusterState clusterState = reader.getClusterState();
         assert clusterState != null;
 
-        // we write updates in batch, but if an exception is thrown when writing new clusterstate,
-        // we do not sure which message is bad message, therefore we will re-process node one by one
-        int fallbackQueueSize = Integer.MAX_VALUE;
-        ZkDistributedQueue fallbackQueue = workQueue;
+
         while (!checkClosed()) {
           if (log.isDebugEnabled()) log.debug("Start of Overseer loop ...");
 
-          if (zkStateWriter == null) {
-            try {
-              zkStateWriter = new ZkStateWriter(reader, stats);
-            //  clusterState = reader.getClusterState();
-              // if there were any errors while processing
-              // the state queue, items would have been left in the
-              // work queue so let's process those first
-              byte[] data = fallbackQueue.peek(null);
-              // TODO: can we do this with a builk call instead?
-              while (fallbackQueueSize > 0 && data != null) {
-                final ZkNodeProps message = ZkNodeProps.load(data);
-                if (log.isDebugEnabled()) log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getZkStats().getQueueLength(), message);
-                // force flush to ZK after each message because there is no fallback if workQueue items
-                // are removed from workQueue but fail to be written to ZK
-                try {
-                  processQueueItem(message, reader.getClusterState(), zkStateWriter, false, null);
-                } catch (InterruptedException | AlreadyClosedException e) {
-                  ParWork.propagateInterrupt(e);
-                  return;
-                } catch (KeeperException.SessionExpiredException e) {
-                  log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
-                  return;
-                } catch (Exception e) {
-                  SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-                  try {
-                    if (isBadMessage(e)) {
-                      log.warn(
-                              "Exception when process message = {}, consider as bad message and poll out from the queue",
-                              message);
-                      fallbackQueue.poll(null);
-                    }
-                  } catch (InterruptedException e1) {
-                    ParWork.propagateInterrupt(e);
-                    return;
-                  } catch (Exception e1) {
-                    exp.addSuppressed(e1);
-                  }
-                  throw exp;
-                }
-                fallbackQueue.poll(null); // poll-ing removes the element we got by peek-ing
-                data = fallbackQueue.peek(null);
-                fallbackQueueSize--;
-              }
-              // force flush at the end of the loop, if there are no pending updates, this is a no op call
-              clusterState = zkStateWriter.writePendingUpdates(clusterState, null);
-              // the workQueue is empty now, use stateUpdateQueue as fallback queue
-              fallbackQueue = stateUpdateQueue;
-              fallbackQueueSize = 0;
-            } catch (KeeperException.SessionExpiredException e) {
-              log.error("run()", e);
-
-              log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
-              return;
-            } catch (InterruptedException | AlreadyClosedException e) {
-              ParWork.propagateInterrupt(e, true);
-              return;
-            } catch (Exception e) {
-              log.error("Unexpected error in Overseer state update loop", e);
-              return;
-//              if (!isClosed()) {
-//                continue;
-//              }
-            }
-          }
-
           LinkedList<Pair<String, byte[]>> queue = null;
           try {
             // We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
 
-            long wait = 5000;
+            long wait = 1000;
             queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> x.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
           } catch (AlreadyClosedException e) {
             if (isClosed()) {
@@ -373,9 +300,7 @@ public class Overseer implements SolrCloseable {
               if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
               queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node) || node.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
             }
-            fallbackQueueSize = processedNodes.size();
-            // we should force write all pending updates because the next iteration might sleep until there
-            // are more items in the main queue
+
             clusterState = zkStateWriter.writePendingUpdates(clusterState,  () -> {
               if (log.isDebugEnabled()) log.debug("clear processedNodes={}", processedNodes);
               stateUpdateQueue.remove(processedNodes);
@@ -1042,24 +967,9 @@ public class Overseer implements SolrCloseable {
     });
   }
 
-  /**
-   * Internal overseer work queue. This should not be used outside of Overseer.
-   * <p>
-   * This queue is used to store overseer operations that have been removed from the
-   * state update queue but are being executed as part of a batch. Once
-   * the result of the batch is persisted to zookeeper, these items are removed from the
-   * work queue. If the overseer dies while processing a batch then a new overseer always
-   * operates from the work queue first and only then starts processing operations from the
-   * state update queue.
-   * This method will create the /overseer znode in ZooKeeper if it does not exist already.
-   *
-   * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
-   * @param zkStats  a {@link Stats} object which tracks statistics for all zookeeper operations performed by this queue
-   * @return a {@link ZkDistributedQueue} object
-   */
-  static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
-    return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
-  }
+//  static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
+//    return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
+//  }
 
   /* Internal map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getRunningMap(final SolrZkClient zkClient) throws KeeperException {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 5c0f33c..0b6498d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -197,7 +197,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
         while (runningTasksSize() > MAX_PARALLEL_TASKS) {
           synchronized (waitLock) {
-            waitLock.wait(1000);//wait for 1000 ms or till a task is complete
+            waitLock.wait(250);
           }
           waited = true;
         }
@@ -208,14 +208,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         if (log.isDebugEnabled()) log.debug("Add {} blocked tasks to process", blockedTasks.size());
         heads.addAll(blockedTasks.values());
         blockedTasks.clear(); // clear it now; may get refilled below.
-        //If we have enough items in the blocked tasks already, it makes
-        // no sense to read more items from the work queue. it makes sense
-        // to clear out at least a few items in the queue before we read more items
+
         if (heads.size() < MAX_BLOCKED_TASKS) {
           //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
           int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasksSize());
           if (log.isDebugEnabled()) log.debug("PeekTopN for {} items", toFetch);
-          List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 5000);
+          List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 10);
           if (log.isDebugEnabled()) log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
           heads.addAll(newTasks);
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 38c019e..eabc34f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -139,7 +139,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       try {
         zookeeper.setData(responsePath, event.getBytes(), true);
       } catch (KeeperException.NoNodeException ignored) {
-        // this will often not exist or have been removed
+        // this will often not exist or have been removed - nocommit - debug this response stuff
         if (log.isDebugEnabled()) log.debug("Response ZK path: {} doesn't exist.", responsePath);
       }
       try {
@@ -150,7 +150,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       updateLock.lockInterruptibly();
       try {
         knownChildren.remove(event.getId());
-        knownChildren.put(responseId, event.getBytes());
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
           updateLock.unlock();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 06941c5..a413df8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -204,7 +204,7 @@ public class ZkDistributedQueue implements DistributedQueue {
       return result;
     }
 
-    ChildWatcher watcher = new ChildWatcher();
+    ChildWatcher watcher = new ChildWatcher(acceptFilter);
     TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
 
     if (foundChildren.size() > 0) {
@@ -318,7 +318,7 @@ public class ZkDistributedQueue implements DistributedQueue {
         return result;
       }
 
-      ChildWatcher watcher = new ChildWatcher();
+      ChildWatcher watcher = new ChildWatcher(acceptFilter);
       TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
 
       TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
@@ -555,7 +555,7 @@ public class ZkDistributedQueue implements DistributedQueue {
   public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, acceptFilter);
     List<Pair<String,byte[]>> result = null;
-    ChildWatcher watcher = new ChildWatcher();
+    ChildWatcher watcher = new ChildWatcher(acceptFilter);
     TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
     TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
@@ -793,6 +793,12 @@ public class ZkDistributedQueue implements DistributedQueue {
 
   @VisibleForTesting class ChildWatcher implements Watcher {
 
+    private final Predicate<String> acceptFilter;
+
+    public ChildWatcher(Predicate<String> acceptFilter) {
+      this.acceptFilter = acceptFilter;
+    }
+
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher; except for Expired
@@ -805,8 +811,12 @@ public class ZkDistributedQueue implements DistributedQueue {
       if (event.getType() == Event.EventType.NodeChildrenChanged) {
         updateLock.lock();
         try {
-          fetchZkChildren(null, null);
-          changed.signalAll();
+          TreeMap<String,byte[]> found = fetchZkChildren(null, acceptFilter);
+          if (!found.isEmpty()) {
+            changed.signalAll();
+          } else {
+            fetchZkChildren(this, acceptFilter);
+          }
         } catch (KeeperException | InterruptedException e) {
           log.error("", e);
         } finally {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b77b20c..cb7ed71 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -352,8 +353,9 @@ public class CoreContainer implements Closeable {
 
     this.asyncSolrCoreLoad = asyncSolrCoreLoad;
 
-    this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
-        ParWork.getExecutorService(cfg.getReplayUpdatesThreads()));
+    this.replayUpdatesExecutor = new OrderedExecutor(cfg.getReplayUpdatesThreads(),
+        ParWork.getParExecutorService("replayUpdatesExecutor", cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(),
+            0, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
 
     metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
     String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
@@ -394,18 +396,6 @@ public class CoreContainer implements Closeable {
 
     solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(32, Runtime.getRuntime().availableProcessors()),
         false, false);
-    //    if (solrCoreLoadExecutor == null) {
-    //      synchronized (CoreContainer.class) {
-    //        if (solrCoreLoadExecutor == null) {
-    ////          solrCoreLoadExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Math.max(3, Runtime.getRuntime().availableProcessors() / 2),
-    ////                  2, TimeUnit.SECOND,
-    ////                  new BlockingArrayQueue<>(100, 10),
-    ////                  new SolrNamedThreadFactory("SolrCoreLoader"));
-    //          solrCoreLoadExecutor = new ParWorkExecutor("SolrCoreLoader", Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
-    //        }
-    //      }
-    // }
-
   }
 
   @SuppressWarnings({"unchecked"})
diff --git a/solr/core/src/java/org/apache/solr/core/CoreSorter.java b/solr/core/src/java/org/apache/solr/core/CoreSorter.java
index 9797b93..618855f 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreSorter.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreSorter.java
@@ -94,6 +94,10 @@ public final class CoreSorter implements Comparator<CoreDescriptor> {
   private final Map<String, CountsForEachShard> shardsVsReplicaCounts = new HashMap<>();
 
   CoreSorter init(ZkController zkController, Collection<CoreDescriptor> coreDescriptors) {
+
+    assert zkController != null;
+    assert zkController.getCoreContainer() != null;
+    assert zkController.getCoreContainer().getNodeConfig() != null;
     String myNodeName = zkController.getCoreContainer().getNodeConfig().getNodeName();
     ClusterState state = zkController.getCoreContainer().getZkController().getClusterState();
     for (CoreDescriptor coreDescriptor : coreDescriptors) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index d694b2d..3328138 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -28,6 +28,7 @@ import java.io.Writer;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Constructor;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -35,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -3107,16 +3109,18 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   public static void deleteUnloadedCore(CoreDescriptor cd, boolean deleteDataDir, boolean deleteInstanceDir) {
     if (deleteDataDir) {
       log.info("Removing SolrCore dataDir on unload {}", cd.getInstanceDir().resolve(cd.getDataDir()));
-      File dataDir = cd.getInstanceDir().resolve(cd.getDataDir()).toFile();
+      Path dataDir = cd.getInstanceDir().resolve(cd.getDataDir());
       try {
-        FileUtils.deleteDirectory(dataDir);
+
+        Files.walk(dataDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+
       } catch (Exception e) {
-        log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir.getAbsolutePath(), e);
+        log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir, e);
       }
     }
     if (deleteInstanceDir) {
       try {
-        FileUtils.deleteDirectory(cd.getInstanceDir().toFile());
+        Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
       } catch (Exception e) {
         log.error("Failed to delete instance dir for unloaded core: {} dir: {}", cd.getName(), cd.getInstanceDir(), e);
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index f85408e..6c3716b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -391,7 +391,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     final String collectionName = "deleteReplicaOnIndexing";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
         .process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(collectionName, 10, TimeUnit.SECONDS, 1, 2);
+
     AtomicBoolean closed = new AtomicBoolean(false);
     List<Future> futures = new ArrayList<>(TEST_NIGHTLY ? 50 : 5);
     Thread[] threads = new Thread[TEST_NIGHTLY ? 50 : 5];
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index f8dcac0..62ce22e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -86,10 +86,6 @@ public class DeleteShardTest extends SolrCloudTestCase {
     });
 
     CollectionAdminRequest.deleteShard(collection, "shard2").process(cluster.getSolrClient());
-    waitForState("Expected 'shard2' to be removed", collection, (n, c) -> {
-      return c != null && c.getSlice("shard2") == null;
-    });
-
   }
 
   protected void setSliceState(String collection, String slice, State state) throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index de5f808..145c32c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -187,8 +187,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
     moveReplica.setInPlaceMove(inPlaceMove);
     moveReplica.process(cloudClient);
 
-    cluster.waitForActiveCollection(coll, create.getNumShards(), create.getNumShards() * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
-
     assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
 
     checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores);
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 0f7e2e0..adf885d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -813,49 +813,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
     }
   }
 
-  @Test
-  public void testExceptionWhenFlushClusterState() throws Exception {
-
-    SolrZkClient overseerClient = null;
-    ZkStateReader reader = null;
-
-    try {
-
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      // We did not create /collections/collection1 -> this message will cause exception when Overseer tries to flush
-      // the collection state
-      ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "collection1",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats());
-      workQueue.offer(Utils.toJSON(badMessage));
-      overseerClient = electNewOverseer(server.getZkAddress());
-
-      ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
-      q.offer(Utils.toJSON(badMessage));
-
-      TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-      while(!timeOut.hasTimedOut()) {
-        if (q.peek(null) == null) {
-          break;
-        }
-        Thread.sleep(50);
-      }
-
-      assertTrue(showQpeek(workQueue), workQueue.peek(null) == null);
-      assertTrue(showQpeek(q),  q.peek(null) == null);
-    } finally {
-      close(overseerClient);
-      close(reader);
-    }
-  }
-
   private String showQpeek(ZkDistributedQueue q) throws KeeperException, InterruptedException {
     if (q == null) {
       return "";
@@ -1184,75 +1141,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
     }
   }
 
-
-  @Test
-  public void testReplay() throws Exception{
-
-    SolrZkClient overseerClient = null;
-    ZkStateReader reader = null;
-
-    try {
-
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-      //prepopulate work queue with some items to emulate previous overseer died before persisting state
-      DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, false, true);
-
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", COLLECTION,
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      queue.offer(Utils.toJSON(m));
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
-          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
-          ZkStateReader.NODE_NAME_PROP, "node1",
-          ZkStateReader.SHARD_ID_PROP, "shard1",
-          ZkStateReader.COLLECTION_PROP, COLLECTION,
-          ZkStateReader.CORE_NAME_PROP, "core1",
-          ZkStateReader.ROLES_PROP, "",
-          ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
-      queue.offer(Utils.toJSON(m));
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
-          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
-          ZkStateReader.NODE_NAME_PROP, "node1",
-          ZkStateReader.SHARD_ID_PROP, "shard1",
-          ZkStateReader.COLLECTION_PROP, COLLECTION,
-          ZkStateReader.CORE_NAME_PROP, "core2",
-          ZkStateReader.ROLES_PROP, "",
-          ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
-      queue.offer(Utils.toJSON(m));
-
-      overseerClient = electNewOverseer(server.getZkAddress());
-
-      //submit to proper queue
-      queue = overseers.get(0).getStateUpdateQueue();
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
-          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
-          ZkStateReader.NODE_NAME_PROP, "node1",
-          ZkStateReader.SHARD_ID_PROP, "shard1",
-          ZkStateReader.COLLECTION_PROP, COLLECTION,
-          ZkStateReader.CORE_NAME_PROP, "core3",
-          ZkStateReader.ROLES_PROP, "",
-          ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
-      queue.offer(Utils.toJSON(m));
-
-      reader.waitForState(COLLECTION, 1000, TimeUnit.MILLISECONDS,
-          (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice("shard1") != null
-              && collectionState.getSlice("shard1").getReplicas().size() == 3);
-
-      assertNotNull(reader.getClusterState().getCollection(COLLECTION).getSlice("shard1"));
-      assertEquals(3, reader.getClusterState().getCollection(COLLECTION).getSlice("shard1").getReplicasMap().size());
-    } finally {
-      close(overseerClient);
-      close(reader);
-    }
-  }
-
   @Test
   public void testExternalClusterStateChangeBehavior() throws Exception {
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
index 8f90b77..6160b9f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
@@ -70,6 +70,8 @@ public class TestPrepRecovery extends SolrCloudTestCase {
         .setNode(newNodeName)
         .process(solrClient);
 
+    cluster.waitForActiveCollection(collectionName, 1, 3);
+
     // now delete the leader
     Replica leader = solrClient.getZkStateReader().getLeaderRetry(collectionName, "shard1");
     CollectionAdminRequest.deleteReplica(collectionName, "shard1", leader.getName())
@@ -80,12 +82,10 @@ public class TestPrepRecovery extends SolrCloudTestCase {
     CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
         .setNode(newNodeName)
         .process(solrClient);
-
-    // in the absence of the fixes made in SOLR-10914, this statement will timeout after 90s
-    cluster.waitForActiveCollection(collectionName, 1, 3);
   }
 
   @Test
+  @Nightly
   public void testLeaderNotResponding() throws Exception {
     CloudHttp2SolrClient solrClient = cluster.getSolrClient();
 
diff --git a/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java b/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
index b7cdbb2..9e851e9 100644
--- a/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
+++ b/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.util.Utils.fromJSONString;
 
+@LuceneTestCase.Nightly
 public class SimpleFacetsTest extends SolrTestCaseJ4 {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java b/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
index 8da34d5..7b9c723 100644
--- a/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
+++ b/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.SolrParams;
@@ -37,6 +38,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 /** Tests CurrencyField and CurrencyFieldType. */
+@LuceneTestCase.Nightly
 public class CurrencyFieldTypeTest extends SolrTestCaseJ4 {
   private final String fieldName;
   private final Class<? extends ExchangeRateProvider> expectedProviderClass;
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 389df5b..f47dfa0 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -32,12 +32,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.SolrTestCase;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.OrderedExecutor;
 import org.junit.Test;
 
@@ -50,8 +52,9 @@ public class OrderedExecutorTest extends SolrTestCase {
   @Test
   public void testExecutionInOrder() {
     IntBox intBox = new IntBox();
-    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
-        ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+        ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+            0, new LinkedBlockingQueue<>(10)));
 
     for (int i = 0; i < 100; i++) {
       orderedExecutor.submit(1, () -> intBox.value.incrementAndGet());
@@ -62,8 +65,9 @@ public class OrderedExecutorTest extends SolrTestCase {
 
   @Test
   public void testLockWhenQueueIsFull() throws ExecutionException {
-    final OrderedExecutor orderedExecutor = new OrderedExecutor
-      (TEST_NIGHTLY ? 10 : 3, ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+        ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+            0, new LinkedBlockingQueue<>(10)));
     
     try {
       // AAA and BBB events will both depend on the use of the same lockId
@@ -108,8 +112,9 @@ public class OrderedExecutorTest extends SolrTestCase {
   public void testRunInParallel() throws ExecutionException, InterruptedException {
     final int parallelism = atLeast(3);
 
-    final OrderedExecutor orderedExecutor = new OrderedExecutor
-      (parallelism, ParWork.getExecutorService(parallelism));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(parallelism,
+        ParWork.getParExecutorService("replayUpdatesExecutor", parallelism, parallelism,
+            0, new LinkedBlockingQueue<>(parallelism)));
 
     try {
       // distinct lockIds should be able to be used in parallel, up to the size of the executor,
@@ -121,7 +126,7 @@ public class OrderedExecutorTest extends SolrTestCase {
       List<Future> futures = new ArrayList<>();
       for (int i = 0; i < parallelism; i++) {
         final int lockId = i;
-        futures.add(testExecutor.submit(() -> {
+        futures.add(ParWork.getRootSharedExecutor().submit(() -> {
             orderedExecutor.submit(lockId, () -> {
                 try {
                   log.info("Worker #{} starting", lockId);
@@ -212,8 +217,9 @@ public class OrderedExecutorTest extends SolrTestCase {
       base.put(i, i);
       run.put(i, i);
     }
-    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
-        ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3, true, true));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+        ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+            0, new LinkedBlockingQueue<>(10)));
     try {
       for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
         int key = random().nextInt(N);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 88c9bbc..3298204 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -742,9 +742,9 @@ public class SolrZkClient implements Closeable {
           if (keCode == KeeperException.Code.NONODE) {
             if (log.isDebugEnabled()) log.debug("No node found for {}", path1);
           }
+        } else {
+          dataMap.put(path1, data);
         }
-
-        dataMap.put(path1, data);
         latch.countDown();
       }, null);
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 5e7d5d7..faf1d61 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -17,6 +17,7 @@
 package org.apache.solr.common.cloud;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -32,21 +33,20 @@ import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // we use this class to expose nasty stuff for tests
 @SuppressWarnings({"try"})
 public class SolrZooKeeper extends ZooKeeper {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   final Set<Thread> spawnedThreads = ConcurrentHashMap.newKeySet();
   private CloseTracker closeTracker;
 
-  // for test debug
-  //static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>();
-
-  public SolrZooKeeper(String connectString, int sessionTimeout,
-      Watcher watcher) throws IOException {
+  public SolrZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
     super(connectString, sessionTimeout, watcher);
     assert (closeTracker = new CloseTracker()) != null;
-    //clients.put(this, new RuntimeException());
   }
 
   public ClientCnxn getConnection() {
@@ -62,8 +62,7 @@ public class SolrZooKeeper extends ZooKeeper {
       @Override
       public void run() {
         try {
-          AccessController.doPrivileged(
-              (PrivilegedAction<Void>) this::closeZookeeperChannel);
+          AccessController.doPrivileged((PrivilegedAction<Void>) this::closeZookeeperChannel);
         } finally {
           spawnedThreads.remove(this);
         }
@@ -75,13 +74,11 @@ public class SolrZooKeeper extends ZooKeeper {
 
         synchronized (cnxn) {
           try {
-            final Field sendThreadFld = cnxn.getClass()
-                .getDeclaredField("sendThread");
+            final Field sendThreadFld = cnxn.getClass().getDeclaredField("sendThread");
             sendThreadFld.setAccessible(true);
             Object sendThread = sendThreadFld.get(cnxn);
             if (sendThread != null) {
-              Method method = sendThread.getClass()
-                  .getDeclaredMethod("testableCloseSocket");
+              Method method = sendThread.getClass().getDeclaredMethod("testableCloseSocket");
               method.setAccessible(true);
               try {
                 method.invoke(sendThread);
@@ -91,8 +88,7 @@ public class SolrZooKeeper extends ZooKeeper {
             }
           } catch (Exception e) {
             ParWork.propagateInterrupt(e);
-            throw new RuntimeException("Closing Zookeeper send channel failed.",
-                e);
+            throw new RuntimeException("Closing Zookeeper send channel failed.", e);
           }
         }
         return null; // Void
@@ -105,23 +101,26 @@ public class SolrZooKeeper extends ZooKeeper {
   @Override
   public void close() {
     assert closeTracker.close();
-    try (ParWork closer = new ParWork(this)) {
-      closer.collect("zookeeper", ()->{
-        try {
-          SolrZooKeeper.super.close();
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-        }
-      });
-//      closer.collect("keep send thread from sleeping", ()->{
-//       // ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
-//
-//      //  zk.interruptSendThread();
-//       // zk.interruptEventThread();
-//      });
+    //    try (ParWork closer = new ParWork(this)) {
+    //      closer.collect("zookeeper", ()->{
+    //        try {
+    //          SolrZooKeeper.super.close();
+    //        } catch (InterruptedException e) {
+    //          ParWork.propagateInterrupt(e);
+    //        }
+    //      });
+    ////      closer.collect("keep send thread from sleeping", ()->{
+    ////       // ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
+    ////
+    ////      //  zk.interruptSendThread();
+    ////       // zk.interruptEventThread();
+    ////      });
+    try {
+      SolrZooKeeper.super.close();
+    } catch (InterruptedException e) {
+      ParWork.propagateInterrupt(e, true);
     }
-
-
   }
 
 }
+
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
index 8ea1562..d684930 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
 
 @Slow
 @LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
+@LuceneTestCase.Nightly
 public class JdbcTest extends SolrCloudTestCase {
 
   private static final String COLLECTIONORALIAS = "collection1";
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 6330a91..8650193 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -627,7 +627,6 @@ public class ZkTestServer implements Closeable {
       }
     }
 
-   // zooThread.interrupt();
     timer.cancel();
     ParWork.close(chRootClient);
 
@@ -635,13 +634,14 @@ public class ZkTestServer implements Closeable {
 
     startupWait = new CountDownLatch(1);
     if (zooThread != null) {
+      // zooThread.interrupt();
+      zooThread.join(10000);
+      if (zooThread.isAlive()) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Zookeeper thread still running");
+      }
       assert ObjectReleaseTracker.release(zooThread);
     }
-   // zooThread.interrupt();
-    zooThread.join(10000);
-    if (zooThread.isAlive()) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Zookeeper thread still running");
-    }
+
     zooThread = null;
     assert ObjectReleaseTracker.release(this);