You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/05/30 04:12:12 UTC

lucene-solr:branch_7x: SOLR-12338: Replay buffering tlog in parallel

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 327f58e87 -> 04e1b1974


SOLR-12338: Replay buffering tlog in parallel


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/04e1b197
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/04e1b197
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/04e1b197

Branch: refs/heads/branch_7x
Commit: 04e1b19743e330ce66d199c4dc40bbf394be9ed7
Parents: 327f58e
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed May 30 11:05:48 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Wed May 30 11:11:37 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/core/CoreContainer.java     |  14 ++
 .../java/org/apache/solr/core/NodeConfig.java   |  17 ++-
 .../org/apache/solr/core/SolrXmlConfig.java     |   3 +
 .../java/org/apache/solr/update/UpdateLog.java  | 131 ++++++++++++++++---
 .../processor/DistributedUpdateProcessor.java   |   9 +-
 .../org/apache/solr/util/OrderedExecutor.java   | 116 ++++++++++++++++
 solr/core/src/test-files/solr/solr-50-all.xml   |   1 +
 .../test/org/apache/solr/core/TestSolrXml.java  |   1 +
 .../org/apache/solr/search/TestRecovery.java    |  81 ++++++++++++
 .../apache/solr/util/OrderedExecutorTest.java   | 105 +++++++++++++++
 solr/solr-ref-guide/src/format-of-solr-xml.adoc |   4 +
 .../apache/solr/common/util/ExecutorUtil.java   |   7 +
 13 files changed, 468 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8567f8f..0146519 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -250,6 +250,8 @@ Optimizations
   The /export (ExportQParserPlugin) would declare incorrectly that scores are needed.
   Expanded docs (expand component) could be told incorrectly that scores are needed.  (David Smiley)
 
+* SOLR-12338: Replay buffering tlog in parallel. (Cao Manh Dat, David Smiley)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 78247a0..c18c26c 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -105,6 +105,7 @@ import org.apache.solr.security.SecurityPluginHolder;
 import org.apache.solr.update.SolrCoreState;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.OrderedExecutor;
 import org.apache.solr.util.stats.MetricUtils;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -155,6 +156,8 @@ public class CoreContainer {
   private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
       new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
 
+  private final OrderedExecutor replayUpdatesExecutor;
+
   protected LogWatcher logging = null;
 
   private CloserThread backgroundCloser = null;
@@ -294,6 +297,11 @@ public class CoreContainer {
     this.coresLocator = locator;
     this.containerProperties = new Properties(properties);
     this.asyncSolrCoreLoad = asyncSolrCoreLoad;
+    this.replayUpdatesExecutor = new OrderedExecutor(
+        cfg.getReplayUpdatesThreads(),
+        ExecutorUtil.newMDCAwareCachedThreadPool(
+            cfg.getReplayUpdatesThreads(),
+            new DefaultSolrThreadFactory("replayUpdatesExecutor")));
   }
 
   private synchronized void initializeAuthorizationPlugin(Map<String, Object> authorizationConf) {
@@ -435,6 +443,7 @@ public class CoreContainer {
     coresLocator = null;
     cfg = null;
     containerProperties = null;
+    replayUpdatesExecutor = null;
   }
 
   public static CoreContainer createAndLoad(Path solrHome) {
@@ -471,6 +480,10 @@ public class CoreContainer {
     return metricManager;
   }
 
+  public OrderedExecutor getReplayUpdatesExecutor() {
+    return replayUpdatesExecutor;
+  }
+
   //-------------------------------------------------------------------
   // Initialization / Cleanup
   //-------------------------------------------------------------------
@@ -739,6 +752,7 @@ public class CoreContainer {
     isShutDown = true;
 
     ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
+    replayUpdatesExecutor.shutdownAndAwaitTermination();
     if (metricManager != null) {
       metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node));
       metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/NodeConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
index 5b4debe..ea44ad8 100644
--- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
@@ -57,6 +57,8 @@ public class NodeConfig {
 
   private final Integer coreLoadThreads;
 
+  private final int replayUpdatesThreads;
+
   @Deprecated
   // This should be part of the transientCacheConfig, remove in 7.0
   private final int transientCacheSize;
@@ -75,7 +77,7 @@ public class NodeConfig {
                      PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig,
                      String coreAdminHandlerClass, String collectionsAdminHandlerClass,
                      String infoHandlerClass, String configSetsHandlerClass,
-                     LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads,
+                     LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads,
                      int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
                      Properties solrProperties, PluginInfo[] backupRepositoryPlugins,
                      MetricsConfig metricsConfig, PluginInfo transientCacheConfig) {
@@ -93,6 +95,7 @@ public class NodeConfig {
     this.logWatcherConfig = logWatcherConfig;
     this.cloudConfig = cloudConfig;
     this.coreLoadThreads = coreLoadThreads;
+    this.replayUpdatesThreads = replayUpdatesThreads;
     this.transientCacheSize = transientCacheSize;
     this.useSchemaCache = useSchemaCache;
     this.managementPath = managementPath;
@@ -134,6 +137,10 @@ public class NodeConfig {
         : coreLoadThreads;
   }
 
+  public int getReplayUpdatesThreads() {
+    return replayUpdatesThreads;
+  }
+
   public String getSharedLibDirectory() {
     return sharedLibDirectory;
   }
@@ -214,6 +221,7 @@ public class NodeConfig {
     private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50);
     private CloudConfig cloudConfig;
     private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
+    private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors();
     @Deprecated
     //Remove in 7.0 and put it all in the transientCache element in solrconfig.xml
     private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
@@ -327,6 +335,11 @@ public class NodeConfig {
       return this;
     }
 
+    public NodeConfigBuilder setReplayUpdatesThreads(int replayUpdatesThreads) {
+      this.replayUpdatesThreads = replayUpdatesThreads;
+      return this;
+    }
+
     // Remove in Solr 7.0
     @Deprecated
     public NodeConfigBuilder setTransientCacheSize(int transientCacheSize) {
@@ -367,7 +380,7 @@ public class NodeConfig {
     public NodeConfig build() {
       return new NodeConfig(nodeName, coreRootDirectory, solrDataHome, configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
                             updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, infoHandlerClass, configSetsHandlerClass,
-                            logWatcherConfig, cloudConfig, coreLoadThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
+                            logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
                             backupRepositoryPlugins, metricsConfig, transientCacheConfig);
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index 771df1e..3f8bab2 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -268,6 +268,9 @@ public class SolrXmlConfig {
         case "coreLoadThreads":
           builder.setCoreLoadThreads(parseInt(name, value));
           break;
+        case "replayUpdatesThreads":
+          builder.setReplayUpdatesThreads(parseInt(name, value));
+          break;
         case "transientCacheSize":
           builder.setTransientCacheSize(parseInt(name, value));
           break;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index ef0e73e..7f821ea 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -42,6 +42,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
@@ -55,6 +58,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrInfoBean;
@@ -69,9 +73,11 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
 import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.OrderedExecutor;
 import org.apache.solr.util.RTimer;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1667,12 +1673,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       this.inSortedOrder = inSortedOrder;
     }
 
-
-
     private SolrQueryRequest req;
     private SolrQueryResponse rsp;
 
-
     @Override
     public void run() {
       ModifiableSolrParams params = new ModifiableSolrParams();
@@ -1683,7 +1686,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));    // setting request info will help logging
 
       try {
-        for(;;) {
+        for (; ; ) {
           TransactionLog translog = translogs.pollFirst();
           if (translog == null) break;
           doReplay(translog);
@@ -1743,6 +1746,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
         UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
         UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
+        OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor();
+        AtomicInteger pendingTasks = new AtomicInteger(0);
+        AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>();
 
         long commitVersion = 0;
         int operationAndFlags = 0;
@@ -1771,6 +1777,11 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
             o = tlogReader.next();
             if (o == null && activeLog) {
               if (!finishing) {
+                // about to block all the updates including the tasks in the executor
+                // therefore we must wait for them to be finished
+                waitForAllUpdatesGetExecuted(pendingTasks);
+                // from this point, remain updates will be executed in a single thread
+                executor = null;
                 // block to prevent new adds, but don't immediately unlock since
                 // we could be starved from ever completing recovery.  Only unlock
                 // after we've finished this recovery.
@@ -1795,6 +1806,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
           }
 
           if (o == null) break;
+          // fail fast
+          if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
 
           try {
 
@@ -1811,7 +1824,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
-                proc.processAdd(cmd);
+                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
                 break;
               }
               case UpdateLog.DELETE: {
@@ -1822,7 +1835,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("delete " + cmd);
-                proc.processDelete(cmd);
+                execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
                 break;
               }
 
@@ -1834,7 +1847,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("deleteByQuery " + cmd);
-                proc.processDelete(cmd);
+                waitForAllUpdatesGetExecuted(pendingTasks);
+                // DBQ will be executed in the same thread
+                execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate);
                 break;
               }
               case UpdateLog.COMMIT: {
@@ -1857,30 +1872,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
             } else {
               // XXX should not happen?
             }
-          } catch (IOException ex) {
-            recoveryInfo.errors++;
-            loglog.warn("REPLAY_ERR: IOException reading log", ex);
-            // could be caused by an incomplete flush if recovering from log
           } catch (ClassCastException cl) {
             recoveryInfo.errors++;
             loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log.  Entry=" + o, cl);
             // would be caused by a corrupt transaction log
-          } catch (SolrException ex) {
-            if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
-              throw ex;
-            }
-            recoveryInfo.errors++;
-            loglog.warn("REPLAY_ERR: IOException reading log", ex);
-            // could be caused by an incomplete flush if recovering from log
           } catch (Exception ex) {
             recoveryInfo.errors++;
             loglog.warn("REPLAY_ERR: Exception replaying log", ex);
             // something wrong with the request?
           }
           assert TestInjection.injectUpdateLogReplayRandomPause();
-          
         }
 
+        waitForAllUpdatesGetExecuted(pendingTasks);
+        if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
+
         CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
         cmd.setVersion(commitVersion);
         cmd.softCommit = false;
@@ -1917,6 +1923,93 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
         translog.decref();
       }
     }
+
+    private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) {
+      TimeOut timeOut = new TimeOut(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+      try {
+        timeOut.waitFor("Timeout waiting for replay updates finish", () -> {
+          //TODO handle the case when there are no progress after a long time
+          return pendingTasks.get() == 0;
+        });
+      } catch (TimeoutException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      }
+
+    }
+
+    private Integer getBucketHash(UpdateCommand cmd) {
+      if (cmd instanceof AddUpdateCommand) {
+        BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId();
+        if (idBytes == null) return null;
+        return DistributedUpdateProcessor.bucketHash(idBytes);
+      }
+
+      if (cmd instanceof DeleteUpdateCommand) {
+        BytesRef idBytes = ((DeleteUpdateCommand)cmd).getIndexedId();
+        if (idBytes == null) return null;
+        return DistributedUpdateProcessor.bucketHash(idBytes);
+      }
+
+      return null;
+    }
+
+    private void execute(UpdateCommand cmd, OrderedExecutor executor,
+                         AtomicInteger pendingTasks, UpdateRequestProcessor proc,
+                         AtomicReference<SolrException> exceptionHolder) {
+      assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand;
+
+      if (executor != null) {
+        // by using the same hash as DUP, independent updates can avoid waiting for same bucket
+        executor.execute(getBucketHash(cmd), () -> {
+          try {
+            // fail fast
+            if (exceptionHolder.get() != null) return;
+            if (cmd instanceof AddUpdateCommand) {
+              proc.processAdd((AddUpdateCommand) cmd);
+            } else {
+              proc.processDelete((DeleteUpdateCommand) cmd);
+            }
+          } catch (IOException e) {
+            recoveryInfo.errors++;
+            loglog.warn("REPLAY_ERR: IOException reading log", e);
+            // could be caused by an incomplete flush if recovering from log
+          } catch (SolrException e) {
+            if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
+              exceptionHolder.compareAndSet(null, e);
+              return;
+            }
+            recoveryInfo.errors++;
+            loglog.warn("REPLAY_ERR: IOException reading log", e);
+          } finally {
+            pendingTasks.decrementAndGet();
+          }
+        });
+        pendingTasks.incrementAndGet();
+      } else {
+        try {
+          if (cmd instanceof AddUpdateCommand) {
+            proc.processAdd((AddUpdateCommand) cmd);
+          } else {
+            proc.processDelete((DeleteUpdateCommand) cmd);
+          }
+        } catch (IOException e) {
+          recoveryInfo.errors++;
+          loglog.warn("REPLAY_ERR: IOException replaying log", e);
+          // could be caused by an incomplete flush if recovering from log
+        } catch (SolrException e) {
+          if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
+            throw e;
+          }
+          recoveryInfo.errors++;
+          loglog.warn("REPLAY_ERR: IOException replaying log", e);
+        }
+      }
+    }
+
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index d9cceee..8f87510 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -957,6 +957,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     isIndexChanged = true;
   }
 
+  public static int bucketHash(BytesRef idBytes) {
+    assert idBytes != null;
+    return Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+  }
+
   /**
    * @return whether or not to drop this cmd
    * @throws IOException If there is a low-level I/O error.
@@ -981,7 +986,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
-    int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+    int bucketHash = bucketHash(idBytes);
 
     // at this point, there is an update we need to try and apply.
     // we may or may not be the leader.
@@ -1745,7 +1750,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
-    int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+    int bucketHash = bucketHash(idBytes);
 
     // at this point, there is an update we need to try and apply.
     // we may or may not be the leader.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java b/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
new file mode 100644
index 0000000..69399c4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+
+import org.apache.solr.common.util.ExecutorUtil;
+
+public class OrderedExecutor implements Executor {
+  private final ExecutorService delegate;
+  private final SparseStripedLock<Integer> sparseStripedLock;
+
+  public OrderedExecutor(int numThreads, ExecutorService delegate) {
+    this.delegate = delegate;
+    this.sparseStripedLock = new SparseStripedLock<>(numThreads);
+  }
+
+  @Override
+  public void execute(Runnable runnable) {
+    execute(null, runnable);
+  }
+
+  /**
+   * Execute the given command in the future.
+   * If another command with same {@code lockId} is waiting in the queue or running,
+   * this method will block until that command finish.
+   * Therefore different commands with same {@code hash} will be executed in order of calling this method.
+   *
+   * If multiple caller are waiting for a command to finish, there are no guarantee that the earliest call will win.
+   *
+   * @param lockId of the {@code command}, if null then a random hash will be generated
+   * @param command the runnable task
+   *
+   * @throws RejectedExecutionException if this task cannot be accepted for execution
+   */
+  public void execute(Integer lockId, Runnable command) {
+    try {
+      sparseStripedLock.add(lockId);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return;
+    }
+
+    try {
+      if (delegate.isShutdown()) throw new RejectedExecutionException();
+
+      delegate.execute(() -> {
+        try {
+          command.run();
+        } finally {
+          sparseStripedLock.remove(lockId);
+        }
+      });
+    } catch (RejectedExecutionException e) {
+      sparseStripedLock.remove(lockId);
+      throw e;
+    }
+  }
+
+  public void shutdownAndAwaitTermination() {
+    ExecutorUtil.shutdownAndAwaitTermination(delegate);
+  }
+
+  /** A set of locks by a key {@code T}, kind of like Google Striped but the keys are sparse/lazy. */
+  private static class SparseStripedLock<T> {
+    private ConcurrentHashMap<T, CountDownLatch> map = new ConcurrentHashMap<>();
+    private final Semaphore sizeSemaphore;
+
+    SparseStripedLock(int maxSize) {
+      this.sizeSemaphore = new Semaphore(maxSize);
+    }
+
+    public void add(T t) throws InterruptedException {
+      if (t != null) {
+        CountDownLatch myLock = new CountDownLatch(1);
+        CountDownLatch existingLock = map.putIfAbsent(t, myLock);
+        while (existingLock != null) {
+          // wait for existing lock/permit to become available (see remove() below)
+          existingLock.await();
+          existingLock = map.putIfAbsent(t, myLock);
+        }
+        // myLock was successfully inserted
+      }
+      // won the lock
+      sizeSemaphore.acquire();
+    }
+
+    public void remove(T t) {
+      if (t != null) {
+        // remove and signal to any "await"-ers
+        map.remove(t).countDown();
+      }
+      sizeSemaphore.release();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test-files/solr/solr-50-all.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/solr-50-all.xml b/solr/core/src/test-files/solr/solr-50-all.xml
index e2ce924..4718d4a 100644
--- a/solr/core/src/test-files/solr/solr-50-all.xml
+++ b/solr/core/src/test-files/solr/solr-50-all.xml
@@ -26,6 +26,7 @@
   <str name="sharedLib">testSharedLib</str>
   <str name="shareSchema">${shareSchema:true}</str>
   <int name="transientCacheSize">66</int>
+  <int name="replayUpdatesThreads">100</int>
 
   <solrcloud>
     <int name="distribUpdateConnTimeout">22</int>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
index 9224c4d..031b4c8 100644
--- a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
+++ b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
@@ -74,6 +74,7 @@ public class TestSolrXml extends SolrTestCaseJ4 {
     assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass());
     assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass());
     assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount(false));
+    assertEquals("replay update threads", 100, cfg.getReplayUpdatesThreads());
     assertThat("core root dir", cfg.getCoreRootDirectory().toString(), containsString("testCoreRootDirectory"));
     assertEquals("distrib conn timeout", 22, cfg.getUpdateShardHandlerConfig().getDistributedConnectionTimeout());
     assertEquals("distrib socket timeout", 33, cfg.getUpdateShardHandlerConfig().getDistributedSocketTimeout());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 4b00ba2..1d62207 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -47,8 +47,10 @@ import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -101,6 +103,85 @@ public class TestRecovery extends SolrTestCaseJ4 {
   }
 
   @Test
+  public void stressLogReplay() throws Exception {
+    final int NUM_UPDATES = 150;
+    try {
+      DirectUpdateHandler2.commitOnClose = false;
+      final Semaphore logReplay = new Semaphore(0);
+      final Semaphore logReplayFinish = new Semaphore(0);
+
+      UpdateLog.testing_logReplayHook = () -> {
+        try {
+          assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      UpdateLog.testing_logReplayFinishHook = logReplayFinish::release;
+      clearIndex();
+      assertU(commit());
+      Map<Integer, Integer> docIdToVal = new HashMap<>();
+      for (int i = 0; i < NUM_UPDATES; i++) {
+        int kindOfUpdate = random().nextInt(100);
+        if (docIdToVal.size() < 10) kindOfUpdate = 0;
+        if (kindOfUpdate <= 50) {
+          // add a new document update, may by duplicate with the current one
+          int val = random().nextInt(1000);
+          int docId = random().nextInt(10000);
+          addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", val), null);
+          docIdToVal.put(docId, val);
+        } else if (kindOfUpdate <= 80) {
+          // inc val of a document
+          ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+          int docId = ids.get(random().nextInt(ids.size()));
+          int delta = random().nextInt(10);
+          addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("inc", delta)), null);
+          docIdToVal.put(docId, docIdToVal.get(docId) + delta);
+        } else if (kindOfUpdate <= 85) {
+          // set val of a document
+          ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+          int docId = ids.get(random().nextInt(ids.size()));
+          int val = random().nextInt(1000);
+          addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("set", val)), null);
+          docIdToVal.put(docId, val);
+        } else if (kindOfUpdate <= 90) {
+          // delete by id
+          ArrayList<Integer> vals = new ArrayList<>(docIdToVal.values());
+          int val = vals.get(random().nextInt(vals.size()));
+          deleteByQueryAndGetVersion("val_i_dvo:"+val, null);
+          docIdToVal.entrySet().removeIf(integerIntegerEntry -> integerIntegerEntry.getValue() == val);
+        } else {
+          // delete by query
+          ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+          int docId = ids.get(random().nextInt(ids.size()));
+          deleteAndGetVersion(String.valueOf(docId), null);
+          docIdToVal.remove(docId);
+        }
+      }
+
+      h.close();
+      createCore();
+      assertJQ(req("q","*:*") ,"/response/numFound==0");
+      // unblock recovery
+      logReplay.release(Integer.MAX_VALUE);
+      assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+      assertU(commit());
+      assertJQ(req("q","*:*") ,"/response/numFound=="+docIdToVal.size());
+
+      for (Map.Entry<Integer, Integer> entry : docIdToVal.entrySet()) {
+        assertJQ(req("q","id:"+entry.getKey(), "fl", "val_i_dvo") ,
+            "/response/numFound==1",
+            "/response/docs==[{'val_i_dvo':"+entry.getValue()+"}]");
+      }
+    } finally {
+      DirectUpdateHandler2.commitOnClose = true;
+      UpdateLog.testing_logReplayHook = null;
+      UpdateLog.testing_logReplayFinishHook = null;
+    }
+  }
+
+  @Test
   public void testLogReplay() throws Exception {
     
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
new file mode 100644
index 0000000..0211a11
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.Test;
+
+public class OrderedExecutorTest extends LuceneTestCase {
+
+  @Test
+  public void testExecutionInOrder() {
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("executeInOrderTest"));
+    IntBox intBox = new IntBox();
+    for (int i = 0; i < 100; i++) {
+      orderedExecutor.execute(1, () -> intBox.value++);
+    }
+    orderedExecutor.shutdownAndAwaitTermination();
+    assertEquals(intBox.value, 100);
+  }
+
+  @Test
+  public void testLockWhenQueueIsFull() {
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
+    IntBox intBox = new IntBox();
+    long t = System.nanoTime();
+    orderedExecutor.execute(1, () -> {
+      try {
+        Thread.sleep(500L);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      intBox.value++;
+    });
+    assertTrue(System.nanoTime() - t < 100 * 1000000);
+
+    t = System.nanoTime();
+    orderedExecutor.execute(1, () -> {
+      intBox.value++;
+    });
+    assertTrue(System.nanoTime() - t > 300 * 1000000);
+    orderedExecutor.shutdownAndAwaitTermination();
+    assertEquals(intBox.value, 2);
+  }
+
+  @Test
+  public void testRunInParallel() {
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    orderedExecutor.execute(1, () -> {
+      try {
+        Thread.sleep(500L);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      if (atomicInteger.get() == 1) atomicInteger.incrementAndGet();
+    });
+
+    orderedExecutor.execute(2, atomicInteger::incrementAndGet);
+    orderedExecutor.shutdownAndAwaitTermination();
+    assertEquals(atomicInteger.get(), 2);
+  }
+
+  @Test
+  public void testStress() {
+    int N = random().nextInt(50) + 20;
+    Map<Integer, Integer> base = new HashMap<>();
+    Map<Integer, Integer> run = new HashMap<>();
+    for (int i = 0; i < N; i++) {
+      base.put(i, i);
+      run.put(i, i);
+    }
+    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
+    for (int i = 0; i < 1000; i++) {
+      int key = random().nextInt(N);
+      base.put(key, base.get(key) + 1);
+      orderedExecutor.execute(key, () -> run.put(key, run.get(key) + 1));
+    }
+    orderedExecutor.shutdownAndAwaitTermination();
+    assertTrue(base.equals(run));
+  }
+
+  private static class IntBox {
+    int value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/solr-ref-guide/src/format-of-solr-xml.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/format-of-solr-xml.adoc b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
index a235943..bfc3ab7 100644
--- a/solr/solr-ref-guide/src/format-of-solr-xml.adoc
+++ b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
@@ -69,6 +69,10 @@ As above, for custom InfoHandler implementations.
 `coreLoadThreads`::
 Specifies the number of threads that will be assigned to load cores in parallel.
 
+`replayUpdatesThreads`::
+Specifies the number of threads (default value is the number of processors) that will be assigned to replay updates in parallel.
+This pool is shared for all cores of the node.
+
 `coreRootDirectory`::
 The root of the core discovery tree, defaults to `$SOLR_HOME` (by default, `server/solr`).
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index f56f782..6082a69 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -126,6 +126,13 @@ public class ExecutorUtil {
         threadFactory);
   }
 
+  public static ExecutorService newMDCAwareCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
+    return new MDCAwareThreadPoolExecutor(0, maxThreads,
+        60L, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(maxThreads),
+        threadFactory);
+  }
+
   @SuppressForbidden(reason = "class customizes ThreadPoolExecutor so it can be used instead")
   public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor {