You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/31 20:57:40 UTC

[lucene-solr] branch reference_impl_dev updated: @657 Move back to running in caller thread if no threads are available instead of waiting.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new a04f4fc  @657 Move back to running in caller thread if no threads are available instead of waiting.
a04f4fc is described below

commit a04f4fc4445c72fd7027cdbc54bac612790f85c3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 31 15:57:16 2020 -0500

    @657 Move back to running in caller thread if no threads are available instead of waiting.
---
 .../src/java/org/apache/solr/core/CoreContainer.java  |  4 ++--
 .../component/SolrExecutorCompletionService.java      |  9 ++++-----
 .../apache/solr/client/solrj/impl/HttpSolrClient.java |  4 ++--
 .../src/java/org/apache/solr/common/ParWork.java      | 12 ++++++------
 ...WorkExecService.java => PerThreadExecService.java} | 19 +++++++++++++------
 .../src/java/org/apache/solr/SolrTestCase.java        |  4 ++--
 6 files changed, 29 insertions(+), 23 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8bb25dc..c083a17 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -67,7 +67,7 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.DocCollection;
@@ -389,7 +389,7 @@ public class CoreContainer implements Closeable {
     containerProperties.putAll(cfg.getSolrProperties());
 
 
-    solrCoreLoadExecutor = new ParWorkExecService(ParWork.getRootSharedExecutor(), Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
+    solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
 //    if (solrCoreLoadExecutor == null) {
 //      synchronized (CoreContainer.class) {
 //        if (solrCoreLoadExecutor == null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
index 7554c90..6a5ee9b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -5,8 +5,7 @@
 
 package org.apache.solr.handler.component;
 
-import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -18,7 +17,7 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
 public class SolrExecutorCompletionService<V> implements CompletionService<V> {
-  private final ParWorkExecService executor;
+  private final PerThreadExecService executor;
   private final BlockingQueue<Future<V>> completionQueue;
 
   private RunnableFuture<V> newTaskFor(Callable<V> task) {
@@ -29,7 +28,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
     return (RunnableFuture) new FutureTask(task, result);
   }
 
-  public SolrExecutorCompletionService(ParWorkExecService executor) {
+  public SolrExecutorCompletionService(PerThreadExecService executor) {
     if (executor == null) {
       throw new NullPointerException();
     } else {
@@ -38,7 +37,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
     }
   }
 
-  public SolrExecutorCompletionService(ParWorkExecService executor, BlockingQueue<Future<V>> completionQueue) {
+  public SolrExecutorCompletionService(PerThreadExecService executor, BlockingQueue<Future<V>> completionQueue) {
     if (executor != null && completionQueue != null) {
       this.executor = executor;
       this.completionQueue = completionQueue;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 95484db..f59dc49 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -50,7 +50,7 @@ import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -314,7 +314,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
     final HttpRequestBase method = createMethod(request, null);
     try {
       MDC.put("HttpSolrClient.url", baseUrl);
-      mrr.future = (Future<NamedList<Object>>) ((ParWorkExecService) ParWork.getMyPerThreadExecutor()).submit(() -> {
+      mrr.future = (Future<NamedList<Object>>) ((PerThreadExecService) ParWork.getMyPerThreadExecutor()).submit(() -> {
         try {
           executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request));
         } catch (SolrServerException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 2e2ef6c..7b99b45 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -110,7 +110,7 @@ public class ParWork implements Closeable {
   }
 
   public static void closeExecutor(boolean unlockClose) {
-    ParWorkExecService exec = (ParWorkExecService) THREAD_LOCAL_EXECUTOR.get();
+    PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
     if (exec != null) {
       if (unlockClose) {
         exec.closeLock(false);
@@ -373,9 +373,9 @@ public class ParWork implements Closeable {
       }
     }
 
-    ParWorkExecService executor = null;
+    PerThreadExecService executor = null;
     if (needExec) {
-      executor = (ParWorkExecService) getMyPerThreadExecutor();
+      executor = (PerThreadExecService) getMyPerThreadExecutor();
     }
     //initExecutor();
     AtomicReference<Throwable> exception = new AtomicReference<>();
@@ -512,7 +512,7 @@ public class ParWork implements Closeable {
       minThreads = 3;
       maxThreads = PROC_COUNT;
       exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
-      ((ParWorkExecService)exec).closeLock(true);
+      ((PerThreadExecService)exec).closeLock(true);
       // be stuck in poll without an enqueue on shutdown
       THREAD_LOCAL_EXECUTOR.set(exec);
     }
@@ -528,11 +528,11 @@ public class ParWork implements Closeable {
   }
 
   public static ExecutorService getExecutorService(int maximumPoolSize) {
-    return new ParWorkExecService(getRootSharedExecutor(), maximumPoolSize);
+    return new PerThreadExecService(getRootSharedExecutor(), maximumPoolSize);
   }
 
   public static ExecutorService getExecutorService(int maximumPoolSize, boolean noCallerRuns) {
-    return new ParWorkExecService(getRootSharedExecutor(), maximumPoolSize, noCallerRuns);
+    return new PerThreadExecService(getRootSharedExecutor(), maximumPoolSize, noCallerRuns);
   }
 
   private void handleObject(AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, ParObject ob) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
similarity index 91%
rename from solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
rename to solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index af27d14..b07088c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class ParWorkExecService extends AbstractExecutorService {
+public class PerThreadExecService extends AbstractExecutorService {
   private static final Logger log = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
@@ -78,8 +78,15 @@ public class ParWorkExecService extends AbstractExecutorService {
         } else {
 
           try {
-            available.acquire();
-          } catch (InterruptedException e) {
+            boolean success = available.tryAcquire();
+            // I think if we wait here for available instead of running in caller thread
+            // this is why we could not use the per thread executor in the stream classes
+            // this means order cannot matter, but it should generally not matter
+            if (!success) {
+              runIt(runnable, true, true, false);
+              return;
+            }
+          } catch (Exception e) {
             ParWork.propegateInterrupt(e);
             running.decrementAndGet();
             synchronized (awaitTerminate) {
@@ -101,15 +108,15 @@ public class ParWorkExecService extends AbstractExecutorService {
     }
   }
 
-  public ParWorkExecService(ExecutorService service) {
+  public PerThreadExecService(ExecutorService service) {
     this(service, -1);
   }
 
-  public ParWorkExecService(ExecutorService service, int maxSize) {
+  public PerThreadExecService(ExecutorService service, int maxSize) {
     this(service, maxSize, false);
   }
   
-  public ParWorkExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
+  public PerThreadExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
     assert service != null;
     this.noCallerRuns = noCallerRuns; 
     //assert ObjectReleaseTracker.track(this);
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 48a4775..925f138 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -47,7 +47,7 @@ import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecService;
+import org.apache.solr.common.PerThreadExecService;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.TimeTracker;
@@ -200,7 +200,7 @@ public class SolrTestCase extends LuceneTestCase {
 
 
     testExecutor = ParWork.getMyPerThreadExecutor();
-    ((ParWorkExecService) testExecutor).closeLock(true);
+    ((PerThreadExecService) testExecutor).closeLock(true);
     // stop zkserver threads that can linger
     //interruptThreadsOnTearDown("nioEventLoopGroup", false);