You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/31 20:57:40 UTC
[lucene-solr] branch reference_impl_dev updated: @657 Move back to
running in caller thread if no threads are available instead of waiting.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new a04f4fc @657 Move back to running in caller thread if no threads are available instead of waiting.
a04f4fc is described below
commit a04f4fc4445c72fd7027cdbc54bac612790f85c3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 31 15:57:16 2020 -0500
@657 Move back to running in caller thread if no threads are available instead of waiting.
---
.../src/java/org/apache/solr/core/CoreContainer.java | 4 ++--
.../component/SolrExecutorCompletionService.java | 9 ++++-----
.../apache/solr/client/solrj/impl/HttpSolrClient.java | 4 ++--
.../src/java/org/apache/solr/common/ParWork.java | 12 ++++++------
...WorkExecService.java => PerThreadExecService.java} | 19 +++++++++++++------
.../src/java/org/apache/solr/SolrTestCase.java | 4 ++--
6 files changed, 29 insertions(+), 23 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8bb25dc..c083a17 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -67,7 +67,7 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
@@ -389,7 +389,7 @@ public class CoreContainer implements Closeable {
containerProperties.putAll(cfg.getSolrProperties());
- solrCoreLoadExecutor = new ParWorkExecService(ParWork.getRootSharedExecutor(), Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
+ solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
// if (solrCoreLoadExecutor == null) {
// synchronized (CoreContainer.class) {
// if (solrCoreLoadExecutor == null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
index 7554c90..6a5ee9b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -5,8 +5,7 @@
package org.apache.solr.handler.component;
-import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -18,7 +17,7 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
public class SolrExecutorCompletionService<V> implements CompletionService<V> {
- private final ParWorkExecService executor;
+ private final PerThreadExecService executor;
private final BlockingQueue<Future<V>> completionQueue;
private RunnableFuture<V> newTaskFor(Callable<V> task) {
@@ -29,7 +28,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
return (RunnableFuture) new FutureTask(task, result);
}
- public SolrExecutorCompletionService(ParWorkExecService executor) {
+ public SolrExecutorCompletionService(PerThreadExecService executor) {
if (executor == null) {
throw new NullPointerException();
} else {
@@ -38,7 +37,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
}
}
- public SolrExecutorCompletionService(ParWorkExecService executor, BlockingQueue<Future<V>> completionQueue) {
+ public SolrExecutorCompletionService(PerThreadExecService executor, BlockingQueue<Future<V>> completionQueue) {
if (executor != null && completionQueue != null) {
this.executor = executor;
this.completionQueue = completionQueue;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 95484db..f59dc49 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -50,7 +50,7 @@ import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -314,7 +314,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
final HttpRequestBase method = createMethod(request, null);
try {
MDC.put("HttpSolrClient.url", baseUrl);
- mrr.future = (Future<NamedList<Object>>) ((ParWorkExecService) ParWork.getMyPerThreadExecutor()).submit(() -> {
+ mrr.future = (Future<NamedList<Object>>) ((PerThreadExecService) ParWork.getMyPerThreadExecutor()).submit(() -> {
try {
executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request));
} catch (SolrServerException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 2e2ef6c..7b99b45 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -110,7 +110,7 @@ public class ParWork implements Closeable {
}
public static void closeExecutor(boolean unlockClose) {
- ParWorkExecService exec = (ParWorkExecService) THREAD_LOCAL_EXECUTOR.get();
+ PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
if (exec != null) {
if (unlockClose) {
exec.closeLock(false);
@@ -373,9 +373,9 @@ public class ParWork implements Closeable {
}
}
- ParWorkExecService executor = null;
+ PerThreadExecService executor = null;
if (needExec) {
- executor = (ParWorkExecService) getMyPerThreadExecutor();
+ executor = (PerThreadExecService) getMyPerThreadExecutor();
}
//initExecutor();
AtomicReference<Throwable> exception = new AtomicReference<>();
@@ -512,7 +512,7 @@ public class ParWork implements Closeable {
minThreads = 3;
maxThreads = PROC_COUNT;
exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
- ((ParWorkExecService)exec).closeLock(true);
+ ((PerThreadExecService)exec).closeLock(true);
// be stuck in poll without an enqueue on shutdown
THREAD_LOCAL_EXECUTOR.set(exec);
}
@@ -528,11 +528,11 @@ public class ParWork implements Closeable {
}
public static ExecutorService getExecutorService(int maximumPoolSize) {
- return new ParWorkExecService(getRootSharedExecutor(), maximumPoolSize);
+ return new PerThreadExecService(getRootSharedExecutor(), maximumPoolSize);
}
public static ExecutorService getExecutorService(int maximumPoolSize, boolean noCallerRuns) {
- return new ParWorkExecService(getRootSharedExecutor(), maximumPoolSize, noCallerRuns);
+ return new PerThreadExecService(getRootSharedExecutor(), maximumPoolSize, noCallerRuns);
}
private void handleObject(AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, ParObject ob) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
similarity index 91%
rename from solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
rename to solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index af27d14..b07088c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-public class ParWorkExecService extends AbstractExecutorService {
+public class PerThreadExecService extends AbstractExecutorService {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
@@ -78,8 +78,15 @@ public class ParWorkExecService extends AbstractExecutorService {
} else {
try {
- available.acquire();
- } catch (InterruptedException e) {
+ boolean success = available.tryAcquire();
+ // I think if we wait here for available instead of running in caller thread
+ // this is why we could not use the per thread executor in the stream classes
+ // this means order cannot matter, but it should generally not matter
+ if (!success) {
+ runIt(runnable, true, true, false);
+ return;
+ }
+ } catch (Exception e) {
ParWork.propegateInterrupt(e);
running.decrementAndGet();
synchronized (awaitTerminate) {
@@ -101,15 +108,15 @@ public class ParWorkExecService extends AbstractExecutorService {
}
}
- public ParWorkExecService(ExecutorService service) {
+ public PerThreadExecService(ExecutorService service) {
this(service, -1);
}
- public ParWorkExecService(ExecutorService service, int maxSize) {
+ public PerThreadExecService(ExecutorService service, int maxSize) {
this(service, maxSize, false);
}
- public ParWorkExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
+ public PerThreadExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
assert service != null;
this.noCallerRuns = noCallerRuns;
//assert ObjectReleaseTracker.track(this);
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 48a4775..925f138 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -47,7 +47,7 @@ import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.TimeTracker;
@@ -200,7 +200,7 @@ public class SolrTestCase extends LuceneTestCase {
testExecutor = ParWork.getMyPerThreadExecutor();
- ((ParWorkExecService) testExecutor).closeLock(true);
+ ((PerThreadExecService) testExecutor).closeLock(true);
// stop zkserver threads that can linger
//interruptThreadsOnTearDown("nioEventLoopGroup", false);