You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/28 05:02:23 UTC
[lucene-solr] 05/06: @416 Fix replay executor.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit c908b3eb94dfb1448b5e6387f7d32e71c0b757e5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Jul 27 23:00:34 2020 -0500
@416 Fix replay executor.
---
solr/core/src/java/org/apache/solr/core/CoreContainer.java | 2 +-
.../src/test/org/apache/solr/util/OrderedExecutorTest.java | 10 ++++++----
solr/solrj/src/java/org/apache/solr/common/ParWork.java | 5 +++--
.../src/java/org/apache/solr/common/ParWorkExecutor.java | 13 +++++++++++--
4 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5539d0c..b40d7fe 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -346,7 +346,7 @@ public class CoreContainer implements Closeable {
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
- ParWork.getExecutorService(0, cfg.getReplayUpdatesThreads(), 1000));
+ ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 1000));
metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index f686cce..ca85b51 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -49,7 +49,8 @@ public class OrderedExecutorTest extends SolrTestCase {
@Test
public void testExecutionInOrder() {
IntBox intBox = new IntBox();
- OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3, new ParWorkExecutor("executeInOrderTest", TEST_NIGHTLY ? 10 : 3));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
+ new ParWorkExecutor("executeInOrderTest", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
try {
for (int i = 0; i < 100; i++) {
orderedExecutor.execute(1, () -> intBox.value.incrementAndGet());
@@ -66,7 +67,7 @@ public class OrderedExecutorTest extends SolrTestCase {
@Test
public void testLockWhenQueueIsFull() {
final OrderedExecutor orderedExecutor = new OrderedExecutor
- (TEST_NIGHTLY ? 10 : 3, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_test"));
+ (TEST_NIGHTLY ? 10 : 3, new ParWorkExecutor("testLockWhenQueueIsFull_test", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
try {
// AAA and BBB events will both depend on the use of the same lockId
@@ -115,7 +116,7 @@ public class OrderedExecutorTest extends SolrTestCase {
final int parallelism = atLeast(3);
final OrderedExecutor orderedExecutor = new OrderedExecutor
- (parallelism, ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_test"));
+ (parallelism, new ParWorkExecutor("testRunInParallel_test", parallelism, parallelism));
try {
// distinct lockIds should be able to be used in parallel, up to the size of the executor,
@@ -215,7 +216,8 @@ public class OrderedExecutorTest extends SolrTestCase {
base.put(i, i);
run.put(i, i);
}
- OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3, ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
+ new ParWorkExecutor("testStress", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
try {
for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
int key = random().nextInt(N);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 6e71ce8..7cd9b92 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -617,7 +617,7 @@ public class ParWork implements Closeable {
}
// figure out thread usage - maybe try to adjust based on current thread count
- exec = getExecutorService(0, Math.max(4, Runtime.getRuntime().availableProcessors() / 3), 5000);
+ exec = getExecutorService(0, Math.max(4, Runtime.getRuntime().availableProcessors() / 3), 1);
THREAD_LOCAL_EXECUTOR.set(exec);
}
@@ -626,7 +626,8 @@ public class ParWork implements Closeable {
public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
ThreadPoolExecutor exec;
- exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(), getMaxPoolSize());
+ exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(),
+ corePoolSize, maximumPoolSize == -1 ? getMaxPoolSize() : maximumPoolSize, keepAliveTime);
return exec;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 23d4215..8b85f30 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -20,14 +20,23 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ParWorkExecutor extends ThreadPoolExecutor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final long KEEP_ALIVE_TIME = 1;
+ public static final int KEEP_ALIVE_TIME = 1;
private final Object lock = new Object();
private static AtomicInteger threadNumber = new AtomicInteger(0);
public ParWorkExecutor(String name, int maxPoolsSize) {
- super(0, maxPoolsSize, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(Integer.getInteger("solr.threadExecQueueSize", 30)), new ThreadFactory() {
+ this(name, 0, maxPoolsSize, KEEP_ALIVE_TIME);
+ }
+
+ public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize) {
+ this(name, corePoolsSize, maxPoolsSize, KEEP_ALIVE_TIME);
+ }
+
+
+ public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize, int keepalive) {
+ super(corePoolsSize, maxPoolsSize, keepalive, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(Integer.getInteger("solr.threadExecQueueSize", 30)), new ThreadFactory() {
ThreadGroup group;