You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/28 05:02:23 UTC

[lucene-solr] 05/06: @416 Fix replay executor.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit c908b3eb94dfb1448b5e6387f7d32e71c0b757e5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Jul 27 23:00:34 2020 -0500

    @416 Fix replay executor.
---
 solr/core/src/java/org/apache/solr/core/CoreContainer.java  |  2 +-
 .../src/test/org/apache/solr/util/OrderedExecutorTest.java  | 10 ++++++----
 solr/solrj/src/java/org/apache/solr/common/ParWork.java     |  5 +++--
 .../src/java/org/apache/solr/common/ParWorkExecutor.java    | 13 +++++++++++--
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5539d0c..b40d7fe 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -346,7 +346,7 @@ public class CoreContainer implements Closeable {
     this.asyncSolrCoreLoad = asyncSolrCoreLoad;
 
     this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
-            ParWork.getExecutorService(0,  cfg.getReplayUpdatesThreads(), 1000));
+            ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 1000));
 
     metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
     String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index f686cce..ca85b51 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -49,7 +49,8 @@ public class OrderedExecutorTest extends SolrTestCase {
   @Test
   public void testExecutionInOrder() {
     IntBox intBox = new IntBox();
-    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3, new ParWorkExecutor("executeInOrderTest", TEST_NIGHTLY ? 10 : 3));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
+            new ParWorkExecutor("executeInOrderTest", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
     try {
       for (int i = 0; i < 100; i++) {
         orderedExecutor.execute(1, () -> intBox.value.incrementAndGet());
@@ -66,7 +67,7 @@ public class OrderedExecutorTest extends SolrTestCase {
   @Test
   public void testLockWhenQueueIsFull() {
     final OrderedExecutor orderedExecutor = new OrderedExecutor
-      (TEST_NIGHTLY ? 10 : 3, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_test"));
+      (TEST_NIGHTLY ? 10 : 3, new ParWorkExecutor("testLockWhenQueueIsFull_test", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
     
     try {
       // AAA and BBB events will both depend on the use of the same lockId
@@ -115,7 +116,7 @@ public class OrderedExecutorTest extends SolrTestCase {
     final int parallelism = atLeast(3);
 
     final OrderedExecutor orderedExecutor = new OrderedExecutor
-      (parallelism, ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_test"));
+      (parallelism, new ParWorkExecutor("testRunInParallel_test", parallelism, parallelism));
 
     try {
       // distinct lockIds should be able to be used in parallel, up to the size of the executor,
@@ -215,7 +216,8 @@ public class OrderedExecutorTest extends SolrTestCase {
       base.put(i, i);
       run.put(i, i);
     }
-    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3, ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
+    OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
+            new ParWorkExecutor("testStress", TEST_NIGHTLY ? 10 : 3, TEST_NIGHTLY ? 10 : 3));
     try {
       for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
         int key = random().nextInt(N);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 6e71ce8..7cd9b92 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -617,7 +617,7 @@ public class ParWork implements Closeable {
       }
 
       // figure out thread usage - maybe try to adjust based on current thread count
-      exec = getExecutorService(0, Math.max(4, Runtime.getRuntime().availableProcessors() / 3), 5000);
+      exec = getExecutorService(0, Math.max(4, Runtime.getRuntime().availableProcessors() / 3), 1);
       THREAD_LOCAL_EXECUTOR.set(exec);
     }
 
@@ -626,7 +626,8 @@ public class ParWork implements Closeable {
 
   public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
     ThreadPoolExecutor exec;
-    exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(), getMaxPoolSize());
+    exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(),
+            corePoolSize, maximumPoolSize == -1 ? getMaxPoolSize() : maximumPoolSize, keepAliveTime);
     return exec;
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 23d4215..8b85f30 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -20,14 +20,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class ParWorkExecutor extends ThreadPoolExecutor {
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-    public static final long KEEP_ALIVE_TIME = 1;
+    public static final int KEEP_ALIVE_TIME = 1;
 
     private final Object lock = new Object();
 
     private static AtomicInteger threadNumber = new AtomicInteger(0);
 
     public ParWorkExecutor(String name, int maxPoolsSize) {
-        super(0,  maxPoolsSize,  KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(Integer.getInteger("solr.threadExecQueueSize", 30)), new ThreadFactory() {
+        this(name, 0, maxPoolsSize, KEEP_ALIVE_TIME);
+    }
+
+    public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize) {
+        this(name, corePoolsSize, maxPoolsSize, KEEP_ALIVE_TIME);
+    }
+
+
+    public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize, int keepalive) {
+        super(corePoolsSize,  maxPoolsSize,  keepalive, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(Integer.getInteger("solr.threadExecQueueSize", 30)), new ThreadFactory() {
 
             ThreadGroup group;