You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/21 19:12:26 UTC

[lucene-solr] 01/02: @599 Fix OrderedExecutor to no use caller runs.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 4640ee440eba89edd7f5c8d81b304ad41505276b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Aug 21 14:11:08 2020 -0500

    @599 Fix OrderedExecutor to no use caller runs.
---
 .../org/apache/solr/util/OrderedExecutorTest.java  | 33 ++++++++++++++--------
 .../src/java/org/apache/solr/common/ParWork.java   |  8 ++++++
 .../org/apache/solr/common/ParWorkExecService.java | 13 +++++++--
 .../apache/solr/common/util/OrderedExecutor.java   | 14 +++++----
 4 files changed, 50 insertions(+), 18 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 577e12e..1f1dde6 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -20,18 +20,17 @@ package org.apache.solr.util;
 import java.lang.invoke.MethodHandles;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
@@ -39,8 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.SolrTestCase;
 import org.apache.solr.common.ParWork;
-import org.apache.solr.common.ParWorkExecutor;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.OrderedExecutor;
 import org.junit.Test;
 
@@ -89,12 +86,7 @@ public class OrderedExecutorTest extends SolrTestCase {
         });
       // BBB doesn't care about the latch, but because it uses the same lockId, it's blocked on AAA
       // so we execute it in a background thread...
-      Future<?> future = testExecutor.submit(() -> {
-        orderedExecutor.submit(lockId, () -> {
-          events.add("BBB");
-        });
-      });
-      
+      Future<?> future = testExecutor.submit(new MyNoLimitsCallable(orderedExecutor, lockId, events));
       // now if we release the latchAAA, AAA should be garunteed to fire first, then BBB
       latchAAA.countDown();
       try {
@@ -221,7 +213,7 @@ public class OrderedExecutorTest extends SolrTestCase {
       run.put(i, i);
     }
     OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
-        ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+        ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3, true));
     try {
       for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
         int key = random().nextInt(N);
@@ -238,4 +230,23 @@ public class OrderedExecutorTest extends SolrTestCase {
   private static class IntBox {
     final AtomicInteger value = new AtomicInteger();
   }
+
+  private static class MyNoLimitsCallable extends ParWork.NoLimitsCallable {
+    private final OrderedExecutor orderedExecutor;
+    private final Integer lockId;
+    private final BlockingQueue<String> events;
+
+    public MyNoLimitsCallable(OrderedExecutor orderedExecutor, Integer lockId, BlockingQueue<String> events) {
+      this.orderedExecutor = orderedExecutor;
+      this.lockId = lockId;
+      this.events = events;
+    }
+
+    public Object call() {
+      orderedExecutor.submit(lockId, () -> {
+        events.add("BBB");
+      });
+      return null;
+    }
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 69af7a0..2ceb680 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -539,6 +539,10 @@ public class ParWork implements Closeable {
     return new ParWorkExecService(getEXEC(), maximumPoolSize);
   }
 
+  public static ExecutorService getExecutorService(int maximumPoolSize, boolean noCallerRuns) {
+    return new ParWorkExecService(getEXEC(), maximumPoolSize, noCallerRuns);
+  }
+
   private void handleObject(AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, ParObject ob) {
     if (log.isDebugEnabled()) {
       log.debug(
@@ -731,6 +735,10 @@ public class ParWork implements Closeable {
     public SolrFutureTask(Callable callable) {
       super(callable);
     }
+
+    public SolrFutureTask(Runnable runnable, Object value) {
+      super(runnable, value);
+    }
   }
 
   private static class ParObject {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 637f2e2..af27d14 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -32,6 +32,7 @@ public class ParWorkExecService extends AbstractExecutorService {
 
   private final ExecutorService service;
   private final int maxSize;
+  private final boolean noCallerRuns;
   private volatile boolean terminated;
   private volatile boolean shutdown;
 
@@ -104,9 +105,13 @@ public class ParWorkExecService extends AbstractExecutorService {
     this(service, -1);
   }
 
-
   public ParWorkExecService(ExecutorService service, int maxSize) {
+    this(service, maxSize, false);
+  }
+  
+  public ParWorkExecService(ExecutorService service, int maxSize, boolean noCallerRuns) {
     assert service != null;
+    this.noCallerRuns = noCallerRuns; 
     //assert ObjectReleaseTracker.track(this);
     if (maxSize == -1) {
       this.maxSize = MAX_AVAILABLE;
@@ -118,12 +123,16 @@ public class ParWorkExecService extends AbstractExecutorService {
 
   @Override
   protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    if (noCallerRuns) {
+      return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
+    }
     return new FutureTask(runnable, value);
+
   }
 
   @Override
   protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
-    if (callable instanceof ParWork.NoLimitsCallable) {
+    if (noCallerRuns || callable instanceof ParWork.NoLimitsCallable) {
       return (RunnableFuture) new ParWork.SolrFutureTask(callable);
     }
     return new FutureTask(callable);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
index baf723f..7399bb4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
@@ -20,6 +20,7 @@ package org.apache.solr.common.util;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
@@ -61,11 +62,14 @@ public class OrderedExecutor extends ExecutorCompletionService {
     }
 
     try {
-      return delegate.submit(() -> {
-        try {
-          command.run();
-        } finally {
-          sparseStripedLock.remove(lockId);
+      return delegate.submit(new ParWork.NoLimitsCallable(){
+        public Object call() {
+          try {
+            command.run();
+          } finally {
+            sparseStripedLock.remove(lockId);
+          }
+          return null;
         }
       });
     } catch (Exception e) {