You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/05/30 04:12:12 UTC
lucene-solr:branch_7x: SOLR-12338: Replay buffering tlog in parallel
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 327f58e87 -> 04e1b1974
SOLR-12338: Replay buffering tlog in parallel
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/04e1b197
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/04e1b197
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/04e1b197
Branch: refs/heads/branch_7x
Commit: 04e1b19743e330ce66d199c4dc40bbf394be9ed7
Parents: 327f58e
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed May 30 11:05:48 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Wed May 30 11:11:37 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/core/CoreContainer.java | 14 ++
.../java/org/apache/solr/core/NodeConfig.java | 17 ++-
.../org/apache/solr/core/SolrXmlConfig.java | 3 +
.../java/org/apache/solr/update/UpdateLog.java | 131 ++++++++++++++++---
.../processor/DistributedUpdateProcessor.java | 9 +-
.../org/apache/solr/util/OrderedExecutor.java | 116 ++++++++++++++++
solr/core/src/test-files/solr/solr-50-all.xml | 1 +
.../test/org/apache/solr/core/TestSolrXml.java | 1 +
.../org/apache/solr/search/TestRecovery.java | 81 ++++++++++++
.../apache/solr/util/OrderedExecutorTest.java | 105 +++++++++++++++
solr/solr-ref-guide/src/format-of-solr-xml.adoc | 4 +
.../apache/solr/common/util/ExecutorUtil.java | 7 +
13 files changed, 468 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8567f8f..0146519 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -250,6 +250,8 @@ Optimizations
The /export (ExportQParserPlugin) would declare incorrectly that scores are needed.
Expanded docs (expand component) could be told incorrectly that scores are needed. (David Smiley)
+* SOLR-12338: Replay buffering tlog in parallel. (Cao Manh Dat, David Smiley)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 78247a0..c18c26c 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -105,6 +105,7 @@ import org.apache.solr.security.SecurityPluginHolder;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.stats.MetricUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -155,6 +156,8 @@ public class CoreContainer {
private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
+ private final OrderedExecutor replayUpdatesExecutor;
+
protected LogWatcher logging = null;
private CloserThread backgroundCloser = null;
@@ -294,6 +297,11 @@ public class CoreContainer {
this.coresLocator = locator;
this.containerProperties = new Properties(properties);
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
+ this.replayUpdatesExecutor = new OrderedExecutor(
+ cfg.getReplayUpdatesThreads(),
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ cfg.getReplayUpdatesThreads(),
+ new DefaultSolrThreadFactory("replayUpdatesExecutor")));
}
private synchronized void initializeAuthorizationPlugin(Map<String, Object> authorizationConf) {
@@ -435,6 +443,7 @@ public class CoreContainer {
coresLocator = null;
cfg = null;
containerProperties = null;
+ replayUpdatesExecutor = null;
}
public static CoreContainer createAndLoad(Path solrHome) {
@@ -471,6 +480,10 @@ public class CoreContainer {
return metricManager;
}
+ public OrderedExecutor getReplayUpdatesExecutor() {
+ return replayUpdatesExecutor;
+ }
+
//-------------------------------------------------------------------
// Initialization / Cleanup
//-------------------------------------------------------------------
@@ -739,6 +752,7 @@ public class CoreContainer {
isShutDown = true;
ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
+ replayUpdatesExecutor.shutdownAndAwaitTermination();
if (metricManager != null) {
metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node));
metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/NodeConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
index 5b4debe..ea44ad8 100644
--- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
@@ -57,6 +57,8 @@ public class NodeConfig {
private final Integer coreLoadThreads;
+ private final int replayUpdatesThreads;
+
@Deprecated
// This should be part of the transientCacheConfig, remove in 7.0
private final int transientCacheSize;
@@ -75,7 +77,7 @@ public class NodeConfig {
PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig,
String coreAdminHandlerClass, String collectionsAdminHandlerClass,
String infoHandlerClass, String configSetsHandlerClass,
- LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads,
+ LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads,
int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
Properties solrProperties, PluginInfo[] backupRepositoryPlugins,
MetricsConfig metricsConfig, PluginInfo transientCacheConfig) {
@@ -93,6 +95,7 @@ public class NodeConfig {
this.logWatcherConfig = logWatcherConfig;
this.cloudConfig = cloudConfig;
this.coreLoadThreads = coreLoadThreads;
+ this.replayUpdatesThreads = replayUpdatesThreads;
this.transientCacheSize = transientCacheSize;
this.useSchemaCache = useSchemaCache;
this.managementPath = managementPath;
@@ -134,6 +137,10 @@ public class NodeConfig {
: coreLoadThreads;
}
+ public int getReplayUpdatesThreads() {
+ return replayUpdatesThreads;
+ }
+
public String getSharedLibDirectory() {
return sharedLibDirectory;
}
@@ -214,6 +221,7 @@ public class NodeConfig {
private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50);
private CloudConfig cloudConfig;
private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
+ private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors();
@Deprecated
//Remove in 7.0 and put it all in the transientCache element in solrconfig.xml
private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
@@ -327,6 +335,11 @@ public class NodeConfig {
return this;
}
+ public NodeConfigBuilder setReplayUpdatesThreads(int replayUpdatesThreads) {
+ this.replayUpdatesThreads = replayUpdatesThreads;
+ return this;
+ }
+
// Remove in Solr 7.0
@Deprecated
public NodeConfigBuilder setTransientCacheSize(int transientCacheSize) {
@@ -367,7 +380,7 @@ public class NodeConfig {
public NodeConfig build() {
return new NodeConfig(nodeName, coreRootDirectory, solrDataHome, configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, infoHandlerClass, configSetsHandlerClass,
- logWatcherConfig, cloudConfig, coreLoadThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
+ logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
backupRepositoryPlugins, metricsConfig, transientCacheConfig);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index 771df1e..3f8bab2 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -268,6 +268,9 @@ public class SolrXmlConfig {
case "coreLoadThreads":
builder.setCoreLoadThreads(parseInt(name, value));
break;
+ case "replayUpdatesThreads":
+ builder.setReplayUpdatesThreads(parseInt(name, value));
+ break;
case "transientCacheSize":
builder.setTransientCacheSize(parseInt(name, value));
break;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index ef0e73e..7f821ea 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -42,6 +42,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
@@ -55,6 +58,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
@@ -69,9 +73,11 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1667,12 +1673,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
this.inSortedOrder = inSortedOrder;
}
-
-
private SolrQueryRequest req;
private SolrQueryResponse rsp;
-
@Override
public void run() {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -1683,7 +1686,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging
try {
- for(;;) {
+ for (; ; ) {
TransactionLog translog = translogs.pollFirst();
if (translog == null) break;
doReplay(translog);
@@ -1743,6 +1746,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
+ OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor();
+ AtomicInteger pendingTasks = new AtomicInteger(0);
+ AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>();
long commitVersion = 0;
int operationAndFlags = 0;
@@ -1771,6 +1777,11 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
o = tlogReader.next();
if (o == null && activeLog) {
if (!finishing) {
+ // about to block all the updates including the tasks in the executor
+ // therefore we must wait for them to be finished
+ waitForAllUpdatesGetExecuted(pendingTasks);
+ // from this point, remain updates will be executed in a single thread
+ executor = null;
// block to prevent new adds, but don't immediately unlock since
// we could be starved from ever completing recovery. Only unlock
// after we've finished this recovery.
@@ -1795,6 +1806,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
if (o == null) break;
+ // fail fast
+ if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
try {
@@ -1811,7 +1824,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
- proc.processAdd(cmd);
+ execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.DELETE: {
@@ -1822,7 +1835,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("delete " + cmd);
- proc.processDelete(cmd);
+ execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
@@ -1834,7 +1847,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("deleteByQuery " + cmd);
- proc.processDelete(cmd);
+ waitForAllUpdatesGetExecuted(pendingTasks);
+ // DBQ will be executed in the same thread
+ execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate);
break;
}
case UpdateLog.COMMIT: {
@@ -1857,30 +1872,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
} else {
// XXX should not happen?
}
- } catch (IOException ex) {
- recoveryInfo.errors++;
- loglog.warn("REPLAY_ERR: IOException reading log", ex);
- // could be caused by an incomplete flush if recovering from log
} catch (ClassCastException cl) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry=" + o, cl);
// would be caused by a corrupt transaction log
- } catch (SolrException ex) {
- if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
- throw ex;
- }
- recoveryInfo.errors++;
- loglog.warn("REPLAY_ERR: IOException reading log", ex);
- // could be caused by an incomplete flush if recovering from log
} catch (Exception ex) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: Exception replaying log", ex);
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
-
}
+ waitForAllUpdatesGetExecuted(pendingTasks);
+ if (exceptionOnExecuteUpdate.get() != null) throw exceptionOnExecuteUpdate.get();
+
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
cmd.setVersion(commitVersion);
cmd.softCommit = false;
@@ -1917,6 +1923,93 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
translog.decref();
}
}
+
+ private void waitForAllUpdatesGetExecuted(AtomicInteger pendingTasks) {
+ TimeOut timeOut = new TimeOut(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+ try {
+ timeOut.waitFor("Timeout waiting for replay updates finish", () -> {
+ //TODO handle the case when there are no progress after a long time
+ return pendingTasks.get() == 0;
+ });
+ } catch (TimeoutException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ }
+
+ private Integer getBucketHash(UpdateCommand cmd) {
+ if (cmd instanceof AddUpdateCommand) {
+ BytesRef idBytes = ((AddUpdateCommand)cmd).getIndexedId();
+ if (idBytes == null) return null;
+ return DistributedUpdateProcessor.bucketHash(idBytes);
+ }
+
+ if (cmd instanceof DeleteUpdateCommand) {
+ BytesRef idBytes = ((DeleteUpdateCommand)cmd).getIndexedId();
+ if (idBytes == null) return null;
+ return DistributedUpdateProcessor.bucketHash(idBytes);
+ }
+
+ return null;
+ }
+
+ private void execute(UpdateCommand cmd, OrderedExecutor executor,
+ AtomicInteger pendingTasks, UpdateRequestProcessor proc,
+ AtomicReference<SolrException> exceptionHolder) {
+ assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand;
+
+ if (executor != null) {
+ // by using the same hash as DUP, independent updates can avoid waiting for same bucket
+ executor.execute(getBucketHash(cmd), () -> {
+ try {
+ // fail fast
+ if (exceptionHolder.get() != null) return;
+ if (cmd instanceof AddUpdateCommand) {
+ proc.processAdd((AddUpdateCommand) cmd);
+ } else {
+ proc.processDelete((DeleteUpdateCommand) cmd);
+ }
+ } catch (IOException e) {
+ recoveryInfo.errors++;
+ loglog.warn("REPLAY_ERR: IOException reading log", e);
+ // could be caused by an incomplete flush if recovering from log
+ } catch (SolrException e) {
+ if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
+ exceptionHolder.compareAndSet(null, e);
+ return;
+ }
+ recoveryInfo.errors++;
+ loglog.warn("REPLAY_ERR: IOException reading log", e);
+ } finally {
+ pendingTasks.decrementAndGet();
+ }
+ });
+ pendingTasks.incrementAndGet();
+ } else {
+ try {
+ if (cmd instanceof AddUpdateCommand) {
+ proc.processAdd((AddUpdateCommand) cmd);
+ } else {
+ proc.processDelete((DeleteUpdateCommand) cmd);
+ }
+ } catch (IOException e) {
+ recoveryInfo.errors++;
+ loglog.warn("REPLAY_ERR: IOException replaying log", e);
+ // could be caused by an incomplete flush if recovering from log
+ } catch (SolrException e) {
+ if (e.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
+ throw e;
+ }
+ recoveryInfo.errors++;
+ loglog.warn("REPLAY_ERR: IOException replaying log", e);
+ }
+ }
+ }
+
+
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index d9cceee..8f87510 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -957,6 +957,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
isIndexChanged = true;
}
+ public static int bucketHash(BytesRef idBytes) {
+ assert idBytes != null;
+ return Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+ }
+
/**
* @return whether or not to drop this cmd
* @throws IOException If there is a low-level I/O error.
@@ -981,7 +986,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
- int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+ int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
@@ -1745,7 +1750,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
// This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
- int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0);
+ int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java b/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
new file mode 100644
index 0000000..69399c4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/util/OrderedExecutor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+
+import org.apache.solr.common.util.ExecutorUtil;
+
+public class OrderedExecutor implements Executor {
+ private final ExecutorService delegate;
+ private final SparseStripedLock<Integer> sparseStripedLock;
+
+ public OrderedExecutor(int numThreads, ExecutorService delegate) {
+ this.delegate = delegate;
+ this.sparseStripedLock = new SparseStripedLock<>(numThreads);
+ }
+
+ @Override
+ public void execute(Runnable runnable) {
+ execute(null, runnable);
+ }
+
+ /**
+ * Execute the given command in the future.
+ * If another command with same {@code lockId} is waiting in the queue or running,
+ * this method will block until that command finish.
+ * Therefore different commands with same {@code hash} will be executed in order of calling this method.
+ *
+ * If multiple caller are waiting for a command to finish, there are no guarantee that the earliest call will win.
+ *
+ * @param lockId of the {@code command}, if null then a random hash will be generated
+ * @param command the runnable task
+ *
+ * @throws RejectedExecutionException if this task cannot be accepted for execution
+ */
+ public void execute(Integer lockId, Runnable command) {
+ try {
+ sparseStripedLock.add(lockId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ try {
+ if (delegate.isShutdown()) throw new RejectedExecutionException();
+
+ delegate.execute(() -> {
+ try {
+ command.run();
+ } finally {
+ sparseStripedLock.remove(lockId);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ sparseStripedLock.remove(lockId);
+ throw e;
+ }
+ }
+
+ public void shutdownAndAwaitTermination() {
+ ExecutorUtil.shutdownAndAwaitTermination(delegate);
+ }
+
+ /** A set of locks by a key {@code T}, kind of like Google Striped but the keys are sparse/lazy. */
+ private static class SparseStripedLock<T> {
+ private ConcurrentHashMap<T, CountDownLatch> map = new ConcurrentHashMap<>();
+ private final Semaphore sizeSemaphore;
+
+ SparseStripedLock(int maxSize) {
+ this.sizeSemaphore = new Semaphore(maxSize);
+ }
+
+ public void add(T t) throws InterruptedException {
+ if (t != null) {
+ CountDownLatch myLock = new CountDownLatch(1);
+ CountDownLatch existingLock = map.putIfAbsent(t, myLock);
+ while (existingLock != null) {
+ // wait for existing lock/permit to become available (see remove() below)
+ existingLock.await();
+ existingLock = map.putIfAbsent(t, myLock);
+ }
+ // myLock was successfully inserted
+ }
+ // won the lock
+ sizeSemaphore.acquire();
+ }
+
+ public void remove(T t) {
+ if (t != null) {
+ // remove and signal to any "await"-ers
+ map.remove(t).countDown();
+ }
+ sizeSemaphore.release();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test-files/solr/solr-50-all.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/solr-50-all.xml b/solr/core/src/test-files/solr/solr-50-all.xml
index e2ce924..4718d4a 100644
--- a/solr/core/src/test-files/solr/solr-50-all.xml
+++ b/solr/core/src/test-files/solr/solr-50-all.xml
@@ -26,6 +26,7 @@
<str name="sharedLib">testSharedLib</str>
<str name="shareSchema">${shareSchema:true}</str>
<int name="transientCacheSize">66</int>
+ <int name="replayUpdatesThreads">100</int>
<solrcloud>
<int name="distribUpdateConnTimeout">22</int>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
index 9224c4d..031b4c8 100644
--- a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
+++ b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
@@ -74,6 +74,7 @@ public class TestSolrXml extends SolrTestCaseJ4 {
assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass());
assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass());
assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount(false));
+ assertEquals("replay update threads", 100, cfg.getReplayUpdatesThreads());
assertThat("core root dir", cfg.getCoreRootDirectory().toString(), containsString("testCoreRootDirectory"));
assertEquals("distrib conn timeout", 22, cfg.getUpdateShardHandlerConfig().getDistributedConnectionTimeout());
assertEquals("distrib socket timeout", 33, cfg.getUpdateShardHandlerConfig().getDistributedSocketTimeout());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 4b00ba2..1d62207 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -47,8 +47,10 @@ import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
@@ -101,6 +103,85 @@ public class TestRecovery extends SolrTestCaseJ4 {
}
@Test
+ public void stressLogReplay() throws Exception {
+ final int NUM_UPDATES = 150;
+ try {
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = () -> {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = logReplayFinish::release;
+ clearIndex();
+ assertU(commit());
+ Map<Integer, Integer> docIdToVal = new HashMap<>();
+ for (int i = 0; i < NUM_UPDATES; i++) {
+ int kindOfUpdate = random().nextInt(100);
+ if (docIdToVal.size() < 10) kindOfUpdate = 0;
+ if (kindOfUpdate <= 50) {
+ // add a new document update, may by duplicate with the current one
+ int val = random().nextInt(1000);
+ int docId = random().nextInt(10000);
+ addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", val), null);
+ docIdToVal.put(docId, val);
+ } else if (kindOfUpdate <= 80) {
+ // inc val of a document
+ ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+ int docId = ids.get(random().nextInt(ids.size()));
+ int delta = random().nextInt(10);
+ addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("inc", delta)), null);
+ docIdToVal.put(docId, docIdToVal.get(docId) + delta);
+ } else if (kindOfUpdate <= 85) {
+ // set val of a document
+ ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+ int docId = ids.get(random().nextInt(ids.size()));
+ int val = random().nextInt(1000);
+ addAndGetVersion(sdoc("id", String.valueOf(docId), "val_i_dvo", map("set", val)), null);
+ docIdToVal.put(docId, val);
+ } else if (kindOfUpdate <= 90) {
+ // delete by id
+ ArrayList<Integer> vals = new ArrayList<>(docIdToVal.values());
+ int val = vals.get(random().nextInt(vals.size()));
+ deleteByQueryAndGetVersion("val_i_dvo:"+val, null);
+ docIdToVal.entrySet().removeIf(integerIntegerEntry -> integerIntegerEntry.getValue() == val);
+ } else {
+ // delete by query
+ ArrayList<Integer> ids = new ArrayList<>(docIdToVal.keySet());
+ int docId = ids.get(random().nextInt(ids.size()));
+ deleteAndGetVersion(String.valueOf(docId), null);
+ docIdToVal.remove(docId);
+ }
+ }
+
+ h.close();
+ createCore();
+ assertJQ(req("q","*:*") ,"/response/numFound==0");
+ // unblock recovery
+ logReplay.release(Integer.MAX_VALUE);
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertU(commit());
+ assertJQ(req("q","*:*") ,"/response/numFound=="+docIdToVal.size());
+
+ for (Map.Entry<Integer, Integer> entry : docIdToVal.entrySet()) {
+ assertJQ(req("q","id:"+entry.getKey(), "fl", "val_i_dvo") ,
+ "/response/numFound==1",
+ "/response/docs==[{'val_i_dvo':"+entry.getValue()+"}]");
+ }
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+ }
+
+ @Test
public void testLogReplay() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
new file mode 100644
index 0000000..0211a11
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.junit.Test;
+
+public class OrderedExecutorTest extends LuceneTestCase {
+
+ @Test
+ public void testExecutionInOrder() {
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("executeInOrderTest"));
+ IntBox intBox = new IntBox();
+ for (int i = 0; i < 100; i++) {
+ orderedExecutor.execute(1, () -> intBox.value++);
+ }
+ orderedExecutor.shutdownAndAwaitTermination();
+ assertEquals(intBox.value, 100);
+ }
+
+ @Test
+ public void testLockWhenQueueIsFull() {
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
+ IntBox intBox = new IntBox();
+ long t = System.nanoTime();
+ orderedExecutor.execute(1, () -> {
+ try {
+ Thread.sleep(500L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ intBox.value++;
+ });
+ assertTrue(System.nanoTime() - t < 100 * 1000000);
+
+ t = System.nanoTime();
+ orderedExecutor.execute(1, () -> {
+ intBox.value++;
+ });
+ assertTrue(System.nanoTime() - t > 300 * 1000000);
+ orderedExecutor.shutdownAndAwaitTermination();
+ assertEquals(intBox.value, 2);
+ }
+
+ @Test
+ public void testRunInParallel() {
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ orderedExecutor.execute(1, () -> {
+ try {
+ Thread.sleep(500L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (atomicInteger.get() == 1) atomicInteger.incrementAndGet();
+ });
+
+ orderedExecutor.execute(2, atomicInteger::incrementAndGet);
+ orderedExecutor.shutdownAndAwaitTermination();
+ assertEquals(atomicInteger.get(), 2);
+ }
+
+ @Test
+ public void testStress() {
+ int N = random().nextInt(50) + 20;
+ Map<Integer, Integer> base = new HashMap<>();
+ Map<Integer, Integer> run = new HashMap<>();
+ for (int i = 0; i < N; i++) {
+ base.put(i, i);
+ run.put(i, i);
+ }
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
+ for (int i = 0; i < 1000; i++) {
+ int key = random().nextInt(N);
+ base.put(key, base.get(key) + 1);
+ orderedExecutor.execute(key, () -> run.put(key, run.get(key) + 1));
+ }
+ orderedExecutor.shutdownAndAwaitTermination();
+ assertTrue(base.equals(run));
+ }
+
+ private static class IntBox {
+ int value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/solr-ref-guide/src/format-of-solr-xml.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/format-of-solr-xml.adoc b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
index a235943..bfc3ab7 100644
--- a/solr/solr-ref-guide/src/format-of-solr-xml.adoc
+++ b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
@@ -69,6 +69,10 @@ As above, for custom InfoHandler implementations.
`coreLoadThreads`::
Specifies the number of threads that will be assigned to load cores in parallel.
+`replayUpdatesThreads`::
+Specifies the number of threads (default value is the number of processors) that will be assigned to replay updates in parallel.
+This pool is shared for all cores of the node.
+
`coreRootDirectory`::
The root of the core discovery tree, defaults to `$SOLR_HOME` (by default, `server/solr`).
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/04e1b197/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index f56f782..6082a69 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -126,6 +126,13 @@ public class ExecutorUtil {
threadFactory);
}
+ public static ExecutorService newMDCAwareCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
+ return new MDCAwareThreadPoolExecutor(0, maxThreads,
+ 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(maxThreads),
+ threadFactory);
+ }
+
@SuppressForbidden(reason = "class customizes ThreadPoolExecutor so it can be used instead")
public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor {