You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/21 19:12:26 UTC
[lucene-solr] 01/02: @599 Fix OrderedExecutor to no use caller runs.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 4640ee440eba89edd7f5c8d81b304ad41505276b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Aug 21 14:11:08 2020 -0500
@599 Fix OrderedExecutor to no use caller runs.
---
.../org/apache/solr/util/OrderedExecutorTest.java | 33 ++++++++++++++--------
.../src/java/org/apache/solr/common/ParWork.java | 8 ++++++
.../org/apache/solr/common/ParWorkExecService.java | 13 +++++++--
.../apache/solr/common/util/OrderedExecutor.java | 14 +++++----
4 files changed, 50 insertions(+), 18 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 577e12e..1f1dde6 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -20,18 +20,17 @@ package org.apache.solr.util;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
@@ -39,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrTestCase;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecutor;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.OrderedExecutor;
import org.junit.Test;
@@ -89,12 +86,7 @@ public class OrderedExecutorTest extends SolrTestCase {
});
// BBB doesn't care about the latch, but because it uses the same lockId, it's blocked on AAA
// so we execute it in a background thread...
- Future<?> future = testExecutor.submit(() -> {
- orderedExecutor.submit(lockId, () -> {
- events.add("BBB");
- });
- });
-
+ Future<?> future = testExecutor.submit(new MyNoLimitsCallable(orderedExecutor, lockId, events));
// now if we release the latchAAA, AAA should be garunteed to fire first, then BBB
latchAAA.countDown();
try {
@@ -221,7 +213,7 @@ public class OrderedExecutorTest extends SolrTestCase {
run.put(i, i);
}
OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
- ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+ ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3, true));
try {
for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
int key = random().nextInt(N);
@@ -238,4 +230,23 @@ public class OrderedExecutorTest extends SolrTestCase {
private static class IntBox {
final AtomicInteger value = new AtomicInteger();
}
+
+ private static class MyNoLimitsCallable extends ParWork.NoLimitsCallable {
+ private final OrderedExecutor orderedExecutor;
+ private final Integer lockId;
+ private final BlockingQueue<String> events;
+
+ public MyNoLimitsCallable(OrderedExecutor orderedExecutor, Integer lockId, BlockingQueue<String> events) {
+ this.orderedExecutor = orderedExecutor;
+ this.lockId = lockId;
+ this.events = events;
+ }
+
+ public Object call() {
+ orderedExecutor.submit(lockId, () -> {
+ events.add("BBB");
+ });
+ return null;
+ }
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 69af7a0..2ceb680 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -539,6 +539,10 @@ public class ParWork implements Closeable {
return new ParWorkExecService(getEXEC(), maximumPoolSize);
}
+ public static ExecutorService getExecutorService(int maximumPoolSize, boolean noCallerRuns) {
+ return new ParWorkExecService(getEXEC(), maximumPoolSize, noCallerRuns);
+ }
+
private void handleObject(AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, ParObject ob) {
if (log.isDebugEnabled()) {
log.debug(
@@ -731,6 +735,10 @@ public class ParWork implements Closeable {
public SolrFutureTask(Callable callable) {
super(callable);
}
+
+ public SolrFutureTask(Runnable runnable, Object value) {
+ super(runnable, value);
+ }
}
private static class ParObject {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 637f2e2..af27d14 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -32,6 +32,7 @@ public class ParWorkExecService extends AbstractExecutorService {
private final ExecutorService service;
private final int maxSize;
+ private final boolean noCallerRuns;
private volatile boolean terminated;
private volatile boolean shutdown;
@@ -104,9 +105,13 @@ public class ParWorkExecService extends AbstractExecutorService {
this(service, -1);
}
-
public ParWorkExecService(ExecutorService service, int maxSize) {
+ this(service, maxSize, false);
+ }
+
+ public ParWorkExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
assert service != null;
+ this.noCallerRuns = noCallerRuns;
//assert ObjectReleaseTracker.track(this);
if (maxSize == -1) {
this.maxSize = MAX_AVAILABLE;
@@ -118,12 +123,16 @@ public class ParWorkExecService extends AbstractExecutorService {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ if (noCallerRuns) {
+ return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
+ }
return new FutureTask(runnable, value);
+
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- if (callable instanceof ParWork.NoLimitsCallable) {
+ if (noCallerRuns || callable instanceof ParWork.NoLimitsCallable) {
return (RunnableFuture) new ParWork.SolrFutureTask(callable);
}
return new FutureTask(callable);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
index baf723f..7399bb4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
@@ -20,6 +20,7 @@ package org.apache.solr.common.util;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
@@ -61,11 +62,14 @@ public class OrderedExecutor extends ExecutorCompletionService {
}
try {
- return delegate.submit(() -> {
- try {
- command.run();
- } finally {
- sparseStripedLock.remove(lockId);
+ return delegate.submit(new ParWork.NoLimitsCallable(){
+ public Object call() {
+ try {
+ command.run();
+ } finally {
+ sparseStripedLock.remove(lockId);
+ }
+ return null;
}
});
} catch (Exception e) {