You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/10 01:43:47 UTC

[lucene-solr] 03/09: @1330 Finishing up dist updates.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ac03d338c5e9a5d1b9d64c8c961944b638ccab65
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Feb 4 01:41:19 2021 -0600

    @1330 Finishing up dist updates.
    
    # Conflicts:
    #	solr/core/src/java/org/apache/solr/core/CoreContainer.java
    #	solr/core/src/test/org/apache/solr/cloud/TestTolerantUpdateProcessorRandomCloud.java
---
 .../extraction/ExtractingDocumentLoader.java       |   2 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   |   9 +-
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |   3 +-
 .../cloud/api/collections/DeleteCollectionCmd.java |  12 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |   4 +-
 .../java/org/apache/solr/core/CoreContainer.java   |  19 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |   6 +-
 .../apache/solr/handler/loader/CSVLoaderBase.java  |   2 +-
 .../apache/solr/handler/loader/JavabinLoader.java  |   8 +-
 .../org/apache/solr/request/SolrRequestInfo.java   |   3 +
 .../org/apache/solr/update/AddUpdateCommand.java   |   8 +-
 .../org/apache/solr/update/DocumentBuilder.java    |   1 +
 .../src/java/org/apache/solr/update/PeerSync.java  |  21 +-
 .../org/apache/solr/update/SolrCmdDistributor.java |   1 -
 .../org/apache/solr/update/TimedVersionBucket.java |   3 +-
 .../src/java/org/apache/solr/update/UpdateLog.java | 145 +++++---
 .../java/org/apache/solr/update/VersionBucket.java |  41 ++-
 .../processor/DistributedUpdateProcessor.java      | 409 +++++++++++----------
 .../processor/DistributedZkUpdateProcessor.java    |  28 +-
 .../processor/RunUpdateProcessorFactory.java       |   2 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |   2 +
 .../solr/update/TestExceedMaxTermLength.java       |   2 +-
 .../processor/DistributedUpdateProcessorTest.java  |  13 +-
 .../src/java/org/apache/solr/common/ParWork.java   |   2 +-
 .../apache/solr/common/ToleratedUpdateError.java   |   2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |   2 +-
 .../solr/common/util/SolrQueuedThreadPool.java     |   1 -
 solr/solrj/src/test-files/log4j2.xml               |   1 +
 .../src/java/org/apache/solr/SolrTestCase.java     |   3 +-
 29 files changed, 452 insertions(+), 303 deletions(-)

diff --git a/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java b/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
index 2f73d2b..585d9bd 100644
--- a/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
+++ b/solr/contrib/extraction/src/java/org/apache/solr/handler/extraction/ExtractingDocumentLoader.java
@@ -127,13 +127,13 @@ public class ExtractingDocumentLoader extends ContentStreamLoader {
    */
   void doAdd(SolrContentHandler handler, AddUpdateCommand template)
           throws IOException {
-    template.solrDoc = handler.newDocument();
     processor.processAdd(template);
   }
 
   void addDoc(SolrContentHandler handler) throws IOException {
     templateAdd.clear();
     templateAdd.setReq(req);
+    templateAdd.solrDoc = handler.newDocument();
     templateAdd.overwrite = overwrite;
     templateAdd.commitWithin = commitWithin;
     doAdd(handler, templateAdd);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 23a91cd..b1a4b61 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -56,6 +56,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,15 +295,15 @@ public class Overseer implements SolrCloseable {
 //     stateManagmentExecutor = ParWork.getParExecutorService("stateManagmentExecutor",
 //        1, 1, 3000, new SynchronousQueue());
      taskExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerTaskExecutor",
-         4, SysStats.PROC_COUNT, 1000, new LinkedBlockingQueue<>(1024));
+         4, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(32, 64));
     for (int i = 0; i < 4; i++) {
-      taskExecutor.submit(() -> {});
+      taskExecutor.prestartCoreThread();
     }
 
     zkWriterExecutor = (ParWorkExecutor) ParWork.getParExecutorService("overseerZkWriterExecutor",
-        4, SysStats.PROC_COUNT, 1000, new LinkedBlockingQueue<>(1024));
+        4, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(64, 128));
     for (int i = 0; i < 4; i++) {
-      zkWriterExecutor.submit(() -> {});
+      zkWriterExecutor.prestartCoreThread();
     }
 
     if (overseerOnlyClient == null && !closeAndDone && !initedHttpClient) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index dc7d389..ebe1d30 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -264,7 +264,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
   public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
       InterruptedException {
     if (log.isDebugEnabled()) log.debug("offer operation to the Overseeer queue {}", Utils.fromJSON(data));
-    log.info("offer operation to the Overseeer queue {}", Utils.fromJSON(data));
+
     if (shuttingDown.get()) {
       throw new SolrException(SolrException.ErrorCode.CONFLICT,"Solr is shutting down, no more overseer tasks may be offered");
     }
@@ -276,7 +276,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       String watchID = createResponseNode();
 
       if (log.isDebugEnabled()) log.debug("watchId for response node {}, setting a watch ... ", watchID);
-      log.info("watchId for response node {}, setting a watch ... ", watchID);
 
       watcher = new LatchWatcher(Watcher.Event.EventType.NodeDataChanged, watchID, zookeeper);
       Stat stat = zookeeper.exists(watchID, watcher, true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index b3fefb7..af19a82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -192,12 +192,20 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         // make sure it's gone again after cores have been removed
         try {
           ocmh.overseer.getCoreContainer().getZkController().removeCollectionTerms(collection);
-          zkStateReader.getZkClient().clean(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
-
+        } catch (Exception e) {
+          log.error("Exception while trying to remove collection terms", e);
+        }
+        try {
+          // was there a race? let's get after it
+          while (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+            zkStateReader.getZkClient().clean(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+          }
         } catch (Exception e) {
           log.error("Exception while trying to remove collection zknode", e);
         }
 
+
+
         AddReplicaCmd.Response response = new AddReplicaCmd.Response();
         return response;
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index efc6e98..e23a8c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -190,10 +190,10 @@ public class ZkStateWriter {
 
             for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
               if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
-                log.info("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+                if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
                 nodeOperation(entry, Replica.State.getShortState(Replica.State.DOWN));
               } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(entry.getKey()))) {
-                log.info("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
+                if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
                 nodeOperation(entry, Replica.State.getShortState(Replica.State.RECOVERING));
               }
             }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 6aa6fdb..8d3a844 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -135,14 +135,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
@@ -192,10 +191,16 @@ public class CoreContainer implements Closeable {
 
   private volatile UpdateShardHandler updateShardHandler;
 
-  public volatile ExecutorService solrCoreExecutor;
+  public final ThreadPoolExecutor solrCoreExecutor = (ThreadPoolExecutor) ParWork.getParExecutorService("Core",
+      4, Math.max(6, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue<>(256, 256));
 
-  public final ExecutorService coreContainerExecutor = ParWork.getParExecutorService("Core",
-      2, Math.max(6, SysStats.PROC_COUNT), 1000, new ArrayBlockingQueue(256, false));
+  public final ThreadPoolExecutor coreContainerExecutor = (ThreadPoolExecutor) ParWork.getParExecutorService("Core",
+      8, SysStats.PROC_COUNT, 1000, new BlockingArrayQueue<>(256, 256));
+
+  {
+    solrCoreExecutor.prestartAllCoreThreads();
+    coreContainerExecutor.prestartAllCoreThreads();
+  }
 
   private final OrderedExecutor replayUpdatesExecutor;
 
@@ -371,7 +376,7 @@ public class CoreContainer implements Closeable {
 
     this.replayUpdatesExecutor = new OrderedExecutor(cfg.getReplayUpdatesThreads(),
         ParWork.getParExecutorService("replayUpdatesExecutor", cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(),
-            3000, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
+            1000, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
 
     metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
     String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
@@ -410,8 +415,6 @@ public class CoreContainer implements Closeable {
 
     containerProperties.putAll(cfg.getSolrProperties());
 
-    solrCoreExecutor = ParWork.getParExecutorService("Core",
-        4, Math.max(6, SysStats.PROC_COUNT * 2), 1000, new BlockingArrayQueue(64, 16));
   }
 
   @SuppressWarnings({"unchecked"})
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 7529e0a..55ec855 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1031,18 +1031,16 @@ public final class SolrCore implements SolrInfoBean, Closeable {
         updateHandler.getSolrCoreState().increfSolrCoreState();
       }
 
-      IndexSchema schema = configSet.getIndexSchema();
-
       CoreDescriptor cd = Objects.requireNonNull(coreDescriptor, "coreDescriptor cannot be null");
 
       setName(name);
 
       this.solrConfig = configSet.getSolrConfig();
+      IndexSchema schema = configSet.getIndexSchema();
+      setLatestSchema(schema);
       this.resourceLoader = configSet.getSolrConfig().getResourceLoader();
       this.configSetProperties = configSet.getProperties();
 
-      setLatestSchema(schema);
-
       // Initialize the RestManager
       StopWatch initRestManager = new StopWatch(this + "-initRestManager");
       restManager = initRestManager(cd);
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java b/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
index 3824917..97adeb5 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/CSVLoaderBase.java
@@ -378,7 +378,7 @@ abstract class CSVLoaderBase extends ContentStreamLoader {
       }
       try {
         executor.shutdown();
-        executor.awaitTermination(365, TimeUnit.DAYS);
+        executor.awaitTermination(365, TimeUnit.DAYS); // MRM TODO:
       } catch (InterruptedException e) {
         ParWork.propagateInterrupt(e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index d80e065..762e2b7 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -89,9 +89,9 @@ public class JavabinLoader extends ContentStreamLoader {
         if (document == null) {
           return;
         }
-
-        AddUpdateCommand addCmd = getAddCommand(req, updateRequest.getParams());
-
+    
+         AddUpdateCommand addCmd = getAddCommand(req, updateRequest.getParams());
+        
         addCmd.solrDoc = document;
         if (commitWithin != null) {
           addCmd.commitWithin = commitWithin;
@@ -133,8 +133,6 @@ public class JavabinLoader extends ContentStreamLoader {
     SolrParams old = req.getParams();
     try (JavaBinCodec jbc = new JavaBinCodec() {
       SolrParams params;
-      AddUpdateCommand addCmd = null;
-
       @Override
       public List<Object> readIterator(DataInputInputStream fis) throws IOException {
         while (true) {
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index 5d6a73b..bfa352b 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -32,6 +32,7 @@ import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.handler.component.ResponseBuilder;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.util.TimeZoneUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +82,8 @@ public class SolrRequestInfo {
       }
     } finally {
       threadLocal.remove();
+      AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clearAll();
+      AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get().clearAll();
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 9bd2a31..70623f2 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -102,11 +102,15 @@ public class AddUpdateCommand extends UpdateCommand {
      isLastDocInBatch = false;
      version = 0;
      prevVersion = -1;
-     req = null;
      overwrite = true;
      commitWithin = -1;
    }
 
+  public void clearAll() {
+    clear();
+    req = null;
+  }
+
    public SolrInputDocument getSolrInputDocument() {
      return solrDoc;
    }
@@ -128,7 +132,7 @@ public class AddUpdateCommand extends UpdateCommand {
 
   /** Returns the indexed ID for this document.  The returned BytesRef is retained across multiple calls, and should not be modified. */
    public BytesRef getIndexedId() {
-     if (indexedId == null) {
+     if (indexedId == null && req != null) {
        IndexSchema schema = req.getSchema();
        SchemaField sf = schema.getUniqueKeyField();
        if (sf != null) {
diff --git a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
index 03c83f0..ee2debf 100644
--- a/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
+++ b/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
@@ -123,6 +123,7 @@ public class DocumentBuilder {
    * @return Built Lucene document
    */
   public static Document toDocument(SolrInputDocument doc, IndexSchema schema, boolean forInPlaceUpdate, boolean ignoreNestedDocs) {
+    if (doc == null) throw new IllegalArgumentException("SolrInputDocument cannot be null");
     if (!ignoreNestedDocs && doc.hasChildDocuments()) {
       throw unexpectedNestedDocException(schema, forInPlaceUpdate);
     }
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index a83ae95..535d955 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -634,10 +634,14 @@ public class PeerSync implements SolrMetricProducer {
             {
               // byte[] idBytes = (byte[]) entry.get(2);
               SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
-              AddUpdateCommand cmd = new AddUpdateCommand(req);
+              AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+              cmd.clear();
               // cmd.setIndexedId(new BytesRef(idBytes));
               cmd.solrDoc = sdoc;
+              cmd.setReq(req);
               cmd.setVersion(version);
+              BytesRef indexedId = UpdateLog.getIndexedId(sdoc, req);
+              cmd.setIndexedId(indexedId);
               cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
               if (debug) {
                 log.debug("{} add {} id {}", logPrefix, cmd, sdoc.getField(ID));
@@ -676,12 +680,19 @@ public class PeerSync implements SolrMetricProducer {
             }
             case UpdateLog.UPDATE_INPLACE:
             {
-              AddUpdateCommand cmd = UpdateLog.convertTlogEntryToAddUpdateCommand(req, entry, oper, version, null);
-              cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+              SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+              Long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+              AddUpdateCommand ucmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+              ucmd.clear();
+              ucmd.setReq(req);
+              ucmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+              UpdateLog.convertTlogEntryToAddUpdateCommand(req, sdoc, oper, prevVersion, version, ucmd);
+
               if (debug) {
-                log.debug("{} inplace update {} prevVersion={} doc={}", logPrefix, cmd, cmd.prevVersion, cmd.solrDoc);
+                log.debug("{} inplace update {} prevVersion={} doc={}", logPrefix, ucmd, ucmd.prevVersion, ucmd.solrDoc);
               }
-              proc.processAdd(cmd);
+              proc.processAdd(ucmd);
               break;
             }
 
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c4e64aa..207b4cc 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -290,7 +290,6 @@ public class SolrCmdDistributor implements Closeable {
           log.error("Exception sending dist update {} {}", code, t);
           cancels.remove(cancelIndex);
 
-
           // nocommit - we want to prevent any more from this request
           // to go just to this node rather than stop the whole request
           if (code == 404) {
diff --git a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index abeca02..f050330 100644
--- a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 
@@ -40,7 +41,7 @@ public class TimedVersionBucket extends VersionBucket {
    * <code>lockTimeoutMs</code>.
    */
   @Override
-  public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+  public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
     boolean success = false;
 
     try {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 0eff8ea..4405429 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -52,12 +52,14 @@ import com.codahale.metrics.Meter;
 import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrDocumentBase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -71,6 +73,8 @@ import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
@@ -125,6 +129,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     }
   }
 
+  public static BytesRef getIndexedId(SolrInputDocument solrDoc, SolrQueryRequest req) {
+    BytesRef indexedId = null;
+    if (req != null) {
+      IndexSchema schema = req.getSchema();
+      SchemaField sf = schema.getUniqueKeyField();
+      if (sf != null) {
+        if (solrDoc != null) {
+          SolrInputField field = solrDoc.getField(sf.getName());
+          int count = field == null ? 0 : field.getValueCount();
+          if (count == 0) {
+
+          } else if (count > 1) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document contains multiple values for uniqueKey field: " + field);
+          } else {
+            BytesRefBuilder b = new BytesRefBuilder();
+            sf.getType().readableToIndexed(field.getFirstValue().toString(), b);
+            indexedId = b.get();
+          }
+        }
+      }
+    }
+    return indexedId;
+  }
+
   // NOTE: when adding new states make sure to keep existing numbers, because external metrics
   // monitoring may depend on these values being stable.
   public enum State { REPLAYING(0), BUFFERING(1), APPLYING_BUFFERED(2), ACTIVE(3);
@@ -1378,7 +1406,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
             switch (oper) {
               case UpdateLog.UPDATE_INPLACE:
               case UpdateLog.ADD: {
-                AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version, null);
+
+                SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+                Long prevVersion = null;
+                if (oper == UPDATE_INPLACE) {
+                  prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+                }
+                AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+                cmd.clear();
+                cmd.setReq(req);
+                convertTlogEntryToAddUpdateCommand(req, sdoc, oper, prevVersion, version, cmd);
                 cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
                 add(cmd);
                 break;
@@ -1475,7 +1512,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       if (writeCommit) {
         // record a commit
         if (log.isDebugEnabled()) log.debug("Recording current closed for {} log={}", uhandler.core, theLog);
-        CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams()), false);
+        CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
         theLog.writeCommit(cmd);
       }
 
@@ -1942,7 +1979,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         SolrException.log(log, e);
       } finally {
         // change the state while updates are still blocked to prevent races
-        AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get().clear();
         state = State.ACTIVE;
         if (finishing) {
 
@@ -1969,7 +2005,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       SolrRequestInfo.clearRequestInfo();
     }
 
-
     public void doReplay(TransactionLog translog) {
       UpdateRequestProcessor proc = null;
       try {
@@ -2006,9 +2041,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 long cpos = tlogReader.currentPos();
                 long csize = tlogReader.currentSize();
                 if (log.isInfoEnabled()) {
-                  loglog.info(
-                      "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
-                      translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
+                  loglog.info("log replay status {} active={} starting pos={} current pos={} current size={} % read={}", translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
                       Math.floor(cpos / (double) csize * 100.));
                 }
 
@@ -2054,8 +2087,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
           try {
 
             // should currently be a List<Oper,Ver,Doc/Id>
-            @SuppressWarnings({"rawtypes"})
-            List entry = (List) o;
+            @SuppressWarnings({"rawtypes"}) List entry = (List) o;
             operationAndFlags = (Integer) entry.get(UpdateLog.FLAGS_IDX);
             int oper = operationAndFlags & OPERATION_MASK;
             long version = (Long) entry.get(UpdateLog.VERSION_IDX);
@@ -2064,15 +2096,17 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
               case UpdateLog.UPDATE_INPLACE: // fall through to ADD
               case UpdateLog.ADD: {
                 recoveryInfo.adds++;
-                AddUpdateCommand cmd  = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
-                cmd.clear();
-                cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version, cmd);
-                if (cmd.solrDoc == null) {
-                  throw new SolrException(ErrorCode.SERVER_ERROR, "No SolrInputDocument could be read");
+
+                Long prevVersion = null;
+                if (oper == UPDATE_INPLACE) {
+                  prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
                 }
-                cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-                if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
-                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
+
+                if (debug) log.debug("{}", oper == ADD ? "add" : "update");
+
+                AddUpdateCommand cmd = AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand_TLOG.get();
+                cmd.clear();
+                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate, req, (SolrInputDocument) entry.get(entry.size() - 1), oper, version, prevVersion);
                 break;
               }
               case UpdateLog.DELETE: {
@@ -2083,7 +2117,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("delete {}", cmd);
-                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
+                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate, req, null, oper, version, null);
                 break;
               }
 
@@ -2097,7 +2131,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 if (debug) log.debug("deleteByQuery {}", cmd);
                 waitForAllUpdatesGetExecuted(executor, pendingTasks);
                 // DBQ will be executed in the same thread
-                execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate);
+                execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate, req, null, oper, version, null);
                 break;
               }
               case UpdateLog.COMMIT: {
@@ -2177,6 +2211,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       }
     }
 
+    private Integer getBucketHash(BytesRef idBytes) {
+
+      if (idBytes == null) return null;
+      return DistributedUpdateProcessor.bucketHash(idBytes);
+
+    }
+
     private Integer getBucketHash(UpdateCommand cmd) {
       if (cmd instanceof AddUpdateCommand) {
         BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId();
@@ -2193,21 +2234,39 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       return null;
     }
 
-    private Future execute(UpdateCommand cmd, OrderedExecutor executor,
-                         LongAdder pendingTasks, UpdateRequestProcessor proc,
-                         AtomicReference<SolrException> exceptionHolder) {
-      assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand;
+    private Future execute(UpdateCommand ucmd, OrderedExecutor executor, LongAdder pendingTasks, UpdateRequestProcessor proc, AtomicReference<SolrException> exceptionHolder, SolrQueryRequest req,
+        SolrInputDocument doc, int operation, long version, Long prevVersion) {
+      assert ucmd instanceof AddUpdateCommand || ucmd instanceof DeleteUpdateCommand || ucmd == null;
+
+      BytesRef indexedId = null;
+      Integer hash;
+      if (ucmd instanceof AddUpdateCommand) {
+        indexedId = getIndexedId(doc, req);
+        hash = getBucketHash(indexedId);
+      } else {
+        hash = getBucketHash(ucmd);
+      }
+
+
+      BytesRef finalIndexedId = indexedId;
 
       if (executor != null) {
         // by using the same hash as DUP, independent updates can avoid waiting for same bucket
-        return executor.submit(getBucketHash(cmd), () -> {
+
+        return executor.submit(hash, () -> {
           try {
             // fail fast
             if (exceptionHolder.get() != null) return;
-            if (cmd instanceof AddUpdateCommand) {
-              proc.processAdd((AddUpdateCommand) cmd);
+            if (ucmd instanceof AddUpdateCommand) {
+
+              convertTlogEntryToAddUpdateCommand(req, doc, operation, prevVersion, version, (AddUpdateCommand) ucmd);
+              ((AddUpdateCommand) ucmd).setIndexedId(finalIndexedId);
+              ucmd.setReq(req);
+              ucmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+              proc.processAdd((AddUpdateCommand) ucmd);
             } else {
-              proc.processDelete((DeleteUpdateCommand) cmd);
+              proc.processDelete((DeleteUpdateCommand) ucmd);
             }
           } catch (IOException e) {
             recoveryInfo.errors++;
@@ -2227,10 +2286,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         });
       } else {
         try {
-          if (cmd instanceof AddUpdateCommand) {
-            proc.processAdd((AddUpdateCommand) cmd);
+          if (ucmd instanceof AddUpdateCommand) {
+            convertTlogEntryToAddUpdateCommand(req, doc, operation, prevVersion, version, (AddUpdateCommand) ucmd);
+            ((AddUpdateCommand) ucmd).setIndexedId(finalIndexedId);
+            ucmd.setReq(req);
+            ucmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
+
+            proc.processAdd((AddUpdateCommand) ucmd);
           } else {
-            proc.processDelete((DeleteUpdateCommand) cmd);
+            proc.processDelete((DeleteUpdateCommand) ucmd);
           }
         } catch (IOException e) {
           recoveryInfo.errors++;
@@ -2246,8 +2310,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       }
       return ConcurrentUtils.constantFuture(null);
     }
-
-
   }
 
   /**
@@ -2255,28 +2317,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
    * can be applied to ADD the document or do an UPDATE_INPLACE.
    *
    * @param req The request to use as the owner of the new AddUpdateCommand
-   * @param entry Entry from the transaction log that contains the document to be added
+   * @param sdoc The document to be added
    * @param operation The value of the operation flag; this must be either ADD or UPDATE_INPLACE -- 
    *        if it is UPDATE_INPLACE then the previous version will also be read from the entry
    * @param version Version already obtained from the entry.
    */
-  public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req,
-                                                                    @SuppressWarnings({"rawtypes"})List entry,
-                                                                    int operation, long version, AddUpdateCommand cmd) {
-    assert operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE;
-    SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size()-1);
+  public static AddUpdateCommand convertTlogEntryToAddUpdateCommand(SolrQueryRequest req, SolrInputDocument sdoc, int operation,
+      Long prevVersion, long version, AddUpdateCommand cmd) {
+    if (!(operation == UpdateLog.ADD || operation == UpdateLog.UPDATE_INPLACE)) {
+      throw new IllegalArgumentException(String.valueOf(cmd));
+    }
 
     if (cmd == null) {
       cmd = new AddUpdateCommand(req);
-    } else {
-      cmd.setReq(req);
     }
 
     cmd.solrDoc = sdoc;
     cmd.setVersion(version);
-    
-    if (operation == UPDATE_INPLACE) {
-      long prevVersion = (Long) entry.get(UpdateLog.PREV_VERSION_IDX);
+
+    if (prevVersion != null) {
       cmd.prevVersion = prevVersion;
     }
     return cmd;
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index 2adb34f..efa442e 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -17,9 +17,15 @@
 package org.apache.solr.update;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.ParWork;
 
 // TODO: make inner?
@@ -37,6 +43,8 @@ public class VersionBucket {
   private final ReentrantLock lock = new ReentrantLock(true);
   private final Condition lockCondition = lock.newCondition();
 
+  private Map<BytesRef,LongAdder> blockedIds = new ConcurrentHashMap<>();
+
   public void updateHighest(long val) {
     if (highest != 0) {
       highest = Math.max(highest, Math.abs(val));
@@ -48,17 +56,44 @@ public class VersionBucket {
      R apply() throws IOException;
   }
 
-  public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T, R> function) throws IOException {
+  public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
     lock.lock();
     try {
+      if (!blockedIds.keySet().contains(idBytes)) {
+        LongAdder adder = new LongAdder();
+        adder.increment();
+        blockedIds.put(idBytes, adder);
+        lock.unlock();
+      } else {
+        LongAdder adder = blockedIds.get(idBytes);
+        adder.increment();
+      }
       return function.apply();
     } finally {
-      lock.unlock();
+      try {
+        if (!lock.isHeldByCurrentThread()) {
+          lock.lock();
+          LongAdder adder = blockedIds.get(idBytes);
+          adder.decrement();
+          if (adder.longValue() == 0L) {
+            blockedIds.remove(idBytes);
+          }
+        }
+      } finally {
+        if (lock.isHeldByCurrentThread()) lock.unlock();
+      }
     }
   }
 
   public void signalAll() {
-    lockCondition.signalAll();
+    if (!lock.isHeldByCurrentThread()) {
+      lock.lock();
+      try {
+        lockCondition.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   public void awaitNanos(long nanosTimeout) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 673ecbe..b97381b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -228,104 +228,25 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // the request right here but for now I think it is better to just return the status
     // to the client that the minRf wasn't reached and let them handle it    
 
-    boolean dropCmd = false;
     if (!forwardToLeader) {
-      dropCmd = versionAdd(cmd);
-    }
-
-    if (vinfo == null) {
-      return;
-    }
-
-    if (dropCmd) {
-      // TODO: do we need to add anything to the response?
-      if (log.isDebugEnabled()) log.debug("Dropping update {}", cmd.getPrintableId());
-      return;
-    }
-
-    Future<?> distFuture = null;
-    AddUpdateCommand cloneCmd = null;
-
-    boolean nodist = noDistrib();
-    if (!nodist) {
-      if (!forwardToLeader) {
-        // SolrInputDocument clonedDoc = cmd.solrDoc.deepCopy();
-        cloneCmd = (AddUpdateCommand) cmd.clone();
-      }
-      AddUpdateCommand finalCloneCmd;
-      if (cloneCmd != null) {
-        finalCloneCmd = cloneCmd;
-      } else {
-        finalCloneCmd = cmd;
-      }
-      Callable distCall = () -> {
-        if (log.isTraceEnabled()) log.trace("Run distrib add collection");
-
+      Future distFuture = versionAdd(cmd);
+      if (distFuture != null) {
         try {
-          doDistribAdd(finalCloneCmd);
-          if (log.isTraceEnabled()) log.trace("after distrib add collection");
-        } catch (Throwable e) {
-          return e;
-        }
-        return null;
-      };
-
-      if (!forwardToLeader) {
-        distFuture = ParWork.getRootSharedExecutor().submit(distCall);
-      } else {
-        try {
-          distCall.call();
-        } catch (Exception e) {
-          if (e instanceof RuntimeException) {
-            throw (RuntimeException) e;
-          }
+          distFuture.get();
+        } catch (InterruptedException e) {
+          ParWork.propagateInterrupt(e, true);
           throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        } catch (ExecutionException e) {
+          Throwable de = e.getCause();
+          if (de instanceof RuntimeException) {
+            throw (RuntimeException) de;
+          } else {
+            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+          }
         }
       }
-    }
-
-    // TODO: possibly set checkDeleteByQueries as a flag on the command?
-    // if the update updates a doc that is part of a nested structure,
-    // force open a realTimeSearcher to trigger a ulog cache refresh.
-    // This refresh makes RTG handler aware of this update.q
-
-    if (!forwardToLeader) {
-      // TODO: possibly set checkDeleteByQueries as a flag on the command?
-      if (log.isTraceEnabled()) log.trace("Local add cmd {}", cmd.solrDoc);
-      try {
-        doLocalAdd(cmd);
-      } catch (Exception e) {
-        Throwable t;
-        if (e instanceof ExecutionException) {
-          t = e.getCause();
-        } else {
-          t = e;
-        }
-        if (distFuture != null && isLeader && !forwardToLeader) {
-          distFuture.cancel(false);
-          cancelCmds.add(cloneCmd);
-        }
-        if (t instanceof RuntimeException) {
-          throw (RuntimeException) t;
-        }
-        throw new SolrException(ErrorCode.SERVER_ERROR, t);
-      }
-      // if the update updates a doc that is part of a nested structure,
-      // force open a realTimeSearcher to trigger a ulog cache refresh.
-      // This refresh makes RTG handler aware of this update.q
-      if (ulog != null) {
-        if (req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
-          ulog.openRealtimeSearcher();
-        }
-      }
-    }
-
-    if (distFuture != null) {
-      try {
-        Throwable e = (Throwable) distFuture.get();
-      } catch (Exception e) {
-        log.error("dist of add failed", e);
-      }
+    } else {
+      doDistribAdd(cmd);
     }
 
     // TODO: what to do when no idField?
@@ -371,12 +292,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
    * @return whether or not to drop this cmd
    * @throws IOException If there is a low-level I/O error.
    */
-  protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+  protected Future versionAdd(AddUpdateCommand cmd) throws IOException {
     BytesRef idBytes = cmd.getIndexedId();
 
     if (idBytes == null) {
       super.processAdd(cmd);
-      return true;
+      return null;
     }
 
     if (vinfo == null) {
@@ -385,7 +306,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             "Atomic document updates are not supported unless <updateLog/> is configured");
       } else {
         super.processAdd(cmd);
-        return true;
+        return null;
       }
     }
 
@@ -425,16 +346,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket);
       if (dependentVersionFound == -1) {
         // it means the document has been deleted by now at the leader. drop this update
-        return true;
+        return null;
       }
     }
 
-    long finalVersionOnUpdate = versionOnUpdate;
-    return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, forwardedFromCollection, bucket));
-
+    vinfo.lockForUpdate();
+    try {
+      long finalVersionOnUpdate = versionOnUpdate;
+      return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync,
+          leaderLogic, forwardedFromCollection, bucket), idBytes);
+    } finally {
+      vinfo.unlockForUpdate();
+    }
   }
 
-  private boolean doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
+  private Future doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
       boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket) throws IOException {
 
     BytesRef idBytes = cmd.getIndexedId();
@@ -448,7 +374,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // realtime-get to work reliably.
     // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
     // there may be other reasons in the future for a version on the commands
-
+    boolean nodist = noDistrib();
+    AddUpdateCommand cloneCmd = null;
     if (versionsStored) {
 
       long bucketVersion = bucket.highest;
@@ -467,10 +394,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
         getUpdatedDocument(cmd, versionOnUpdate);
 
-        if (isSubShardLeader) {
-          log.info("subshardleader");
-        }
-
         // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
         if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
           // we're not in an active state, and this update isn't from a replay, so buffer it.
@@ -479,7 +402,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           }
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.add(cmd);
-          return true;
+          return null;
         }
 
         if (versionOnUpdate != 0) {
@@ -490,7 +413,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             // specified it must exist (versionOnUpdate==1) and it does.
           } else {
             if (cmd.getReq().getParams().getBool(CommonParams.FAIL_ON_VERSION_CONFLICTS, true) == false) {
-              return true;
+              return null;
             }
             throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate +
                 " actual=" + foundVersion + " params=" + req.getParams());
@@ -500,6 +423,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         long version = vinfo.getNewClock();
         cmd.setVersion(version);
         cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
+
+        if (!nodist) {
+
+          SolrInputDocument clonedDoc = null;
+          if (shouldCloneCmdDoc()) {
+            // SolrInputDocument clonedDoc = cmd.solrDoc.deepCopy();
+            cloneCmd = (AddUpdateCommand) cmd.clone();
+            clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy() : null;
+            cloneCmd.solrDoc = clonedDoc;
+          }
+        }
+
         bucket.updateHighest(version);
       } else {
         // The leader forwarded us this update.
@@ -509,7 +444,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           // we're not in an active state, and this update isn't from a replay, so buffer it.
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.add(cmd);
-          return true;
+          return null;
         }
 
         if (cmd.isInPlaceUpdate()) {
@@ -528,7 +463,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document was deleted at the leader subsequently.", idBytes.utf8ToString());
               }
               versionDelete((DeleteUpdateCommand) fetchedFromLeader);
-              return true;
+              return null;
             } else {
               assert fetchedFromLeader instanceof AddUpdateCommand;
               // Newer document was fetched from the leader. Apply that document instead of this current in-place
@@ -548,7 +483,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
               // this means we got a newer full doc update and in that case it makes no sense to apply the older
               // inplace update. Drop this update
               log.info("Update was applied on version: {}, but last version I have is: {}. Dropping current update", prev, lastVersion);
-              return true;
+              return null;
             } else {
               // We're good, we should apply this update. First, update the bucket's highest.
               if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
@@ -569,17 +504,80 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
               // This update is a repeat, or was reordered. We need to drop this update.
               if (log.isDebugEnabled()) log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
-              return true;
+              return null;
             }
           }
         }
+
         if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
           cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
         }
       }
     }
 
-    return false;
+    Future<?> distFuture = null;
+
+
+    AddUpdateCommand finalCloneCmd;
+    if (!nodist) {
+
+      if (cloneCmd != null) {
+        finalCloneCmd = cloneCmd;
+      } else {
+        finalCloneCmd = cmd;
+      }
+
+      Callable distCall = () -> {
+        if (log.isTraceEnabled()) log.trace("Run distrib add collection");
+        try {
+          doDistribAdd(finalCloneCmd);
+          if (log.isTraceEnabled()) log.trace("after distrib add collection");
+        } catch (Throwable e) {
+          return e;
+        }
+        return null;
+      };
+
+      distFuture = ParWork.getRootSharedExecutor().submit(distCall);
+    }
+
+    // TODO: possibly set checkDeleteByQueries as a flag on the command?
+    // if the update updates a doc that is part of a nested structure,
+    // force open a realTimeSearcher to trigger a ulog cache refresh.
+    // This refresh makes RTG handler aware of this update.q
+
+      try {
+        doLocalAdd(cmd);
+      } catch (Exception e) {
+        Throwable t;
+        if (e instanceof ExecutionException) {
+          t = e.getCause();
+        } else {
+          t = e;
+        }
+        if (distFuture != null && isLeader && !forwardToLeader) {
+          distFuture.cancel(false);
+          if (cloneCmd != null) {
+            cancelCmds.add(cloneCmd);
+          } else {
+            cancelCmds.add(cmd);
+          }
+        }
+        if (t instanceof RuntimeException) {
+          throw (RuntimeException) t;
+        }
+        throw new SolrException(ErrorCode.SERVER_ERROR, t);
+      }
+      // if the update updates a doc that is part of a nested structure,
+      // force open a realTimeSearcher to trigger a ulog cache refresh.
+      // This refresh makes RTG handler aware of this update.q
+      if (ulog != null) {
+        if (req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
+          ulog.openRealtimeSearcher();
+        }
+      }
+
+    return distFuture;
   }
 
   /**
@@ -618,7 +616,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     vinfo.lockForUpdate();
     try {
-      lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout));
+      lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () ->
+          doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout), cmd.getIndexedId());
     } finally {
       vinfo.unlockForUpdate();
     }
@@ -820,27 +819,41 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   // we have to spoof the replicationTracker and set the achieved rf to the number of active replicas.
   //
   protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
-    // TODO: parallel
     setupRequest(cmd);
-    log.info("deletebyid {}", cmd.id);
-    boolean dropCmd = false;
-    if (!forwardToLeader) {
-      dropCmd  = versionDelete(cmd);
-    }
+    if (log.isDebugEnabled()) log.debug("deletebyid {}", cmd.id);
 
-    if (dropCmd) {
-      // TODO: do we need to add anything to the response?
-      return;
-    }
 
-    doDistribDeleteById(cmd);
+    if (!forwardToLeader) {
+      Future future = versionDelete(cmd);
+      if (future != null) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          ParWork.propagateInterrupt(e, true);
+          throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        } catch (ExecutionException e) {
+          Throwable t;
+          if (e instanceof ExecutionException) {
+            t = e.getCause();
+          } else {
+            t = e;
+          }
+          if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+          }
+          throw new SolrException(ErrorCode.SERVER_ERROR, t);
+        }
+      }
+    } else {
+      doDistribDeleteById(cmd);
+    }
 
     // cmd.getIndexId == null when delete by query
     // TODO: what to do when no idField?
     if (returnVersions && rsp != null && cmd.getIndexedId() != null && idField != null) {
       if (deleteResponse == null) {
         deleteResponse = new NamedList<>(1);
-        rsp.add("deletes",deleteResponse);
+        rsp.add("deletes", deleteResponse);
       }
       if (scratch == null) scratch = new CharsRefBuilder();
       idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
@@ -906,69 +919,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // at this point, there is an update we need to try and apply.
     // we may or may not be the leader.
 
-    boolean drop = false;
-    if (!forwardToLeader) {
-       drop = versionDeleteByQuery(cmd);
-    }
+    Future future = versionDeleteByQuery(cmd, replicas, coll);
 
-    if (drop) {
-      return;
-    }
-
-    Future<?> distFuture = null;
-    Callable distCall = () -> {
+    if (future != null) {
       try {
-        doDistribDeleteByQuery(cmd, replicas, coll);
-      } catch (IOException e) {
-        // MRM TODO: fail this harder...
+        future.get();
+      } catch (InterruptedException e) {
+        ParWork.propagateInterrupt(e, true);
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
-      }
-      return null;
-    };
-
-    if (!forwardToLeader) {
-      distFuture = ParWork.getRootSharedExecutor().submit(distCall);
-    } else {
-      try {
-        distCall.call();
-      } catch (Exception e) {
-        if (e instanceof RuntimeException) {
-          throw (RuntimeException) e;
-        }
-        throw new SolrException(ErrorCode.SERVER_ERROR, e);
-      }
-    }
-
-    if (!forwardToLeader) {
-      try {
-        doLocalDelete(cmd);
-      } catch (Exception e) {
-        log.error("Exception on local deleteByQuery", e);
+      } catch (ExecutionException e) {
         Throwable t;
         if (e instanceof ExecutionException) {
           t = e.getCause();
         } else {
           t = e;
         }
-        if (distFuture != null && isLeader && !forwardToLeader) {
-          distFuture.cancel(false);
-          cancelCmds.add(cmd);
-        }
-        if (t instanceof SolrException) {
-          throw (SolrException) t;
+        if (t instanceof RuntimeException) {
+          throw (RuntimeException) t;
         }
         throw new SolrException(ErrorCode.SERVER_ERROR, t);
       }
     }
 
-    if (distFuture != null) {
-      try {
-        distFuture.get();
-      } catch (Exception e) {
-        log.error("dist of add failed", e);
-      }
-    }
-
     if (returnVersions && rsp != null) {
       if (deleteByQueryResponse == null) {
         deleteByQueryResponse = new NamedList<>(1);
@@ -989,7 +961,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // no-op for derived classes to implement
   }
 
-  protected boolean versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+  protected Future versionDeleteByQuery(DeleteUpdateCommand cmd, List<Node> replicas, DocCollection coll) throws IOException {
     // Find the version
     long versionOnUpdate = findVersionOnUpdate(cmd);
 
@@ -1003,7 +975,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     vinfo.blockUpdates();
     try {
 
-      return doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync);
+      return doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync, replicas, coll);
 
       // since we don't know which documents were deleted, the easiest thing to do is to invalidate
       // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
@@ -1024,13 +996,26 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     return versionOnUpdate;
   }
 
-  private boolean doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync) throws IOException {
+  private Future doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, List<Node> replicas, DocCollection coll) throws IOException {
+    Future<?> future = null;
     if (versionsStored) {
       final boolean leaderLogic = isLeader & !isReplayOrPeersync;
       if (leaderLogic) {
         long version = vinfo.getNewClock();
         cmd.setVersion(-version);
         // TODO update versions in all buckets
+
+       future = ParWork.getRootSharedExecutor().submit(() -> {
+          try {
+            doDistribDeleteByQuery(cmd, replicas, coll);
+          } catch (IOException e) {
+            log.error("", e); // MRM TODO:
+          }
+        });
+
+
+        doLocalDelete(cmd);
+
       } else {
         cmd.setVersion(-versionOnUpdate);
 
@@ -1038,16 +1023,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           // we're not in an active state, and this update isn't from a replay, so buffer it.
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.deleteByQuery(cmd);
-          return true;
+          return null; // nocommit
         }
 
+        future = ParWork.getRootSharedExecutor().submit(() -> {
+          try {
+            DeleteUpdateCommand clonedCmd = (DeleteUpdateCommand) cmd.clone();
+            doDistribDeleteByQuery(clonedCmd, replicas, coll);
+          } catch (IOException e) {
+            log.error("", e); // MRM TODO:
+          }
+        });
+
         if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
           // TLOG replica not leader, don't write the DBQ to IW
           cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
         }
+        doLocalDelete(cmd);
       }
     }
-    return false;
+
+    return future;
   }
 
   // internal helper method to setup request by processors who use this class.
@@ -1074,13 +1070,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     return req.getParams().get(DISTRIB_FROM);
   }
 
-  protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+  protected Future versionDelete(DeleteUpdateCommand cmd) throws IOException {
 
     BytesRef idBytes = cmd.getIndexedId();
 
     if (vinfo == null || idBytes == null) {
       super.processDelete(cmd);
-      return false;
+      return null;
     }
 
     // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
@@ -1113,17 +1109,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     try {
       long finalVersionOnUpdate = versionOnUpdate;
       return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionDelete(cmd, finalVersionOnUpdate, signedVersionOnUpdate, isReplayOrPeersync, leaderLogic,
-          forwardedFromCollection, bucket));
+          forwardedFromCollection, bucket), idBytes);
     } finally {
       vinfo.unlockForUpdate();
     }
   }
 
-  private boolean doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
+  private Future doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
       boolean isReplayOrPeersync, boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket)
       throws IOException {
 
     BytesRef idBytes = cmd.getIndexedId();
+    Future<?> distFuture = null;
     if (versionsStored) {
       long bucketVersion = bucket.highest;
 
@@ -1146,7 +1143,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           }
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.delete(cmd);
-          return true;
+          return null;
         }
 
         if (signedVersionOnUpdate != 0) {
@@ -1156,13 +1153,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
             // specified it must exist (versionOnUpdate==1) and it does.
           } else {
-            throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate +
-                " actual=" + foundVersion + " params=" + req.getParams());
+            throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion + " params=" + req.getParams());
           }
         }
 
         long version = vinfo.getNewClock();
         cmd.setVersion(-version);
+
+        distFuture = ParWork.getRootSharedExecutor().submit(() -> {
+          try {
+            doDistribDeleteById(cmd);
+          } catch (IOException e) {
+            log.error("", e); // MRM TODO
+          }
+        });
+
         bucket.updateHighest(version);
       } else {
         cmd.setVersion(-versionOnUpdate);
@@ -1171,7 +1176,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           // we're not in an active state, and this update isn't from a replay, so buffer it.
           cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
           ulog.delete(cmd);
-          return true;
+          return null;
         }
 
         // if we aren't the leader, then we need to check that updates were not re-ordered
@@ -1188,7 +1193,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             if (log.isDebugEnabled()) {
               log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
             }
-            return true;
+            return null;
           }
         }
 
@@ -1198,9 +1203,29 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       }
     }
 
-    doLocalDelete(cmd);
-    return false;
-
+    try {
+      doLocalDelete(cmd);
+    } catch (Exception e) {
+      Throwable t;
+      if (e instanceof ExecutionException) {
+        t = e.getCause();
+      } else {
+        t = e;
+      }
+      if (distFuture != null && isLeader && !forwardToLeader) {
+        distFuture.cancel(false);
+        if (cmd != null) {
+          cancelCmds.add(cmd);
+        } else {
+          cancelCmds.add(cmd);
+        }
+      }
+      if (t instanceof RuntimeException) {
+        throw (RuntimeException) t;
+      }
+      throw new SolrException(ErrorCode.SERVER_ERROR, t);
+    }
+    return distFuture;
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index d4f316e..68ce680 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -99,6 +99,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   //   3) in general, not controlling carefully enough exactly when our view of clusterState is updated
   protected volatile ClusterState clusterState;
 
+  // should we clone the document before sending it to replicas?
+  // this is set to true in the constructor if the next processors in the chain
+  // are custom and may modify the SolrInputDocument racing with its serialization for replication
+  private final boolean cloneRequiredOnLeader;
+
   //used for keeping track of replicas that have processed an add/update from the leader
   private volatile RollupRequestReplicationTracker rollupReplicationTracker = null;
   private volatile LeaderRequestReplicationTracker leaderReplicationTracker = null;
@@ -114,6 +119,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     zkController = cc.getZkController();
     cmdDistrib = new SolrCmdDistributor(zkController.getZkStateReader(), cc.getUpdateShardHandler(), new IsCCClosed(req));
     try {
+      cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
       collection = cloudDesc.getCollectionName();
       clusterState = zkController.getClusterState();
       DocCollection coll = clusterState.getCollectionOrNull(collection, true);
@@ -484,7 +490,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
         if (isLeader) {
           // don't forward to ourself
-          leaderForAnyShard = true;
         } else {
           leaders.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), leader, collection, sliceName));
         }
@@ -499,10 +504,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       }
       cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
 
-      if (!leaderForAnyShard) {
-        return;
-      }
-
       // change the phase to TOLEADER so we look up and forward to our own replicas (if any)
       phase = DistribPhase.TOLEADER;
     }
@@ -511,7 +512,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (DistribPhase.TOLEADER == phase) {
       // This core should be a leader
       isLeader = true;
-      replicas = setupRequestForDBQ();
+      replicas = setupRequestForDBQ(desc.getName());
     } else if (DistribPhase.FROMLEADER == phase) {
       isLeader = false;
     }
@@ -581,19 +582,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   }
 
   // used for deleteByQuery to get the list of nodes this leader should forward to
-  private List<SolrCmdDistributor.Node> setupRequestForDBQ() {
+  private List<SolrCmdDistributor.Node> setupRequestForDBQ(String name) {
     List<SolrCmdDistributor.Node> nodes = null;
     String shardId = cloudDesc.getShardId();
 
     try {
-      Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
-      isLeader = leaderReplica.getName().equals(desc.getName());
 
       // TODO: what if we are no longer the leader?
 
       forwardToLeader = false;
       List<Replica> replicaProps = zkController.getZkStateReader()
-          .getReplicaProps(collection, shardId, leaderReplica.getName(), Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+          .getReplicaProps(collection, shardId,  name, Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
       if (replicaProps != null) {
         nodes = new ArrayList<>(replicaProps.size());
         for (Replica props : replicaProps) {
@@ -802,7 +801,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   protected boolean shouldCloneCmdDoc() {
-    return true;
+    boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
+    return willDistrib & cloneRequiredOnLeader;
   }
 
   // helper method, processAdd was getting a bit large.
@@ -969,7 +969,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           // slice leader can be null because node/shard is created zk before leader election
           if (sliceLeader != null && zkController.getZkStateReader().isNodeLive(sliceLeader.getNodeName()))  {
             if (nodes == null) nodes = new ArrayList<>();
-            nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), sliceLeader, coll.getName(), aslice.getName()));
+            nodes.add(new SolrCmdDistributor.ForwardNode(zkController.getZkStateReader(), sliceLeader, coll.getName(), aslice.getName()));
           }
         }
       }
@@ -1252,8 +1252,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           } else {
             // not the leader anymore maybe or the error'd node is not my replica?
             if (!foundErrorNodeInReplicaList) {
-              log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={}", desc.getName(), collection, cloudDesc.getShardId(),
-                  stdNode.getNodeProps().getCoreUrl(), myReplicas);
+              log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent! replicas={} node={}", desc.getName(), collection, cloudDesc.getShardId(),
+                  stdNode.getNodeProps().getCoreUrl(), myReplicas, error.req.node.getClass().getSimpleName());
               if (!shardId.equals(cloudDesc.getShardId())) {
                 // some replicas on other shard did not receive the updates (ex: during splitshard),
                 // exception must be notified to clients
diff --git a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
index ecf17f2..70963f8 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java
@@ -126,7 +126,7 @@ public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory {
       try {
         super.doClose();
       } finally {
-        AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clear();
+     //   AddUpdateCommand.THREAD_LOCAL_AddUpdateCommand.get().clear();
       }
     }
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index ce759d2..4c3e278 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -221,6 +221,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     response = CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
 
     assertEquals(0, response.getStatus());
+
+    assertFalse(zkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName));
     // nocommit what happened to success?
 //    assertTrue(response.toString(), response.isSuccess());
 //    Map<String,NamedList<Integer>> nodesStatus = response.getCollectionNodesStatus();
diff --git a/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java b/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
index 9bf502b..8e7ac49 100644
--- a/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
+++ b/solr/core/src/test/org/apache/solr/update/TestExceedMaxTermLength.java
@@ -85,7 +85,7 @@ public class TestExceedMaxTermLength extends SolrTestCaseJ4 {
         }
       } catch (Exception e) {
         //expected
-        String msg= e.getCause().getMessage();
+        String msg= e.getCause() == null ? e.getMessage() : e.getCause().getMessage();
         assertTrue(msg.contains("one immense term in field=\"cat\""));
       }
 
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 7fa942d..56a9abb 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
@@ -93,7 +94,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
   public void testVersionAdd() throws IOException {
     SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams(), true);
     int threads = 5;
-    Function<DistributedUpdateProcessor,Boolean> versionAddFunc = (DistributedUpdateProcessor process) -> {
+    Function<DistributedUpdateProcessor,Future> versionAddFunc = (DistributedUpdateProcessor process) -> {
       try {
         AddUpdateCommand cmd = new AddUpdateCommand(req);
         cmd.solrDoc = new SolrInputDocument();
@@ -118,7 +119,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
     SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams(), true);
 
     int threads = TEST_NIGHTLY ? 5 : 2;
-    Function<DistributedUpdateProcessor,Boolean> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
+    Function<DistributedUpdateProcessor,Future> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
       try {
         DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
         cmd.id = "1";
@@ -142,7 +143,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
    * @return how many requests succeeded
    */
   private int runCommands(int threads, int versionBucketLockTimeoutMs, SolrQueryRequest req,
-      Function<DistributedUpdateProcessor,Boolean> function)
+      Function<DistributedUpdateProcessor,Future> function)
       throws IOException {
     try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
         req, null, null, null)) {
@@ -156,7 +157,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
            * simulate the case: it takes 5 seconds to add the doc
            */
           @Override
-          public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+          public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function, BytesRef idBytes) throws IOException {
             boolean locked = false;
             try {
               locked = lock.tryLock(versionBucketLockTimeoutMs, TimeUnit.MILLISECONDS);
@@ -180,7 +181,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
         }).when(vinfo).bucket(anyInt());
       }
       CountDownLatch latch = new CountDownLatch(1);
-      Collection<Future<Boolean>> futures = new ArrayList<>();
+      Collection<Future<Future>> futures = new ArrayList<>();
       for (int t = 0; t < threads; ++t) {
         futures.add(executor.submit(() -> {
           latch.await();
@@ -190,7 +191,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
       latch.countDown();
 
       int succeeded = 0;
-      for (Future<Boolean> f : futures) {
+      for (Future<Future> f : futures) {
         try {
           f.get();
           succeeded++;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 0586d70..acc1754 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -84,7 +84,7 @@ public class ParWork implements Closeable {
               new SynchronousQueue());
           ((ParWorkExecutor)EXEC).enableCloseLock();
           for (int i = 0; i < Math.min(coreSize, 12); i++) {
-            EXEC.submit(() -> {});
+            EXEC.prestartCoreThread();
           }
         }
       }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
index 329573e..b49e0a9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ToleratedUpdateError.java
@@ -146,7 +146,7 @@ public final class ToleratedUpdateError {
    * @see #parseMetadataIfToleratedUpdateError
    */
   public String getMetadataValue() {
-    return message.toString();
+    return message;
   }
   
   /** 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3b2cad9..7b63f14 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -757,7 +757,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             ParWork.propagateInterrupt(e);
             throw new SolrException(ErrorCode.SERVER_ERROR, e);
           }
-          if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+          if (exists != null && exists.getVersion() == cached.getZNodeVersion()) {
             shouldFetch = false;
           }
         }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 3ce7fbf..b66e593 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.util;
 
-import org.apache.commons.math3.analysis.function.Add;
 import org.apache.solr.common.ParWork;
 import org.eclipse.jetty.util.AtomicBiInteger;
 import org.eclipse.jetty.util.BlockingArrayQueue;
diff --git a/solr/solrj/src/test-files/log4j2.xml b/solr/solrj/src/test-files/log4j2.xml
index 73e4654..0ac5dfd 100644
--- a/solr/solrj/src/test-files/log4j2.xml
+++ b/solr/solrj/src/test-files/log4j2.xml
@@ -28,6 +28,7 @@
     </Console>
   </Appenders>
   <Loggers>
+    <AsyncLogger name="org.apache.solr.update.LoggingInfoStream" level="off"/>
     <AsyncLogger name="org.apache.zookeeper" level="WARN"/>
     <AsyncLogger name="org.apache.hadoop" level="WARN"/>
     <AsyncLogger name="org.apache.directory" level="WARN"/>
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 284298b..967eafc 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -878,7 +878,8 @@ public class SolrTestCase extends LuceneTestCase {
       if (testExecutor != null) {
         return testExecutor;
       }
-      testExecutor = (ParWorkExecutor) ParWork.getParExecutorService("testExecutor", 10, 30, 1000, new BlockingArrayQueue(30, 16));
+      testExecutor = (ParWorkExecutor) ParWork.getParExecutorService(
+          "testExecutor", 10, 30, 500, new BlockingArrayQueue(32, 16));
       testExecutor.prestartAllCoreThreads();
       ((ParWorkExecutor) testExecutor).enableCloseLock();
       return testExecutor;