You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 06:07:28 UTC

[lucene-solr] 02/04: @494 Clean up virtual exec a bit.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit dd62d54c2be96b1f0cdab7ea2cc395ad0d4fa1a9
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 10 23:31:21 2020 -0500

    @494 Clean up virtual exec a bit.
---
 .../cloud/autoscaling/sim/SimCloudManager.java     |  6 +--
 .../java/org/apache/solr/core/CoreContainer.java   |  2 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |  2 +-
 .../java/org/apache/solr/pkg/PackageListeners.java | 14 +++----
 .../java/org/apache/solr/pkg/PackageLoader.java    | 10 ++++-
 .../src/java/org/apache/solr/common/ParWork.java   | 39 +++++++++--------
 .../org/apache/solr/common/ParWorkExecService.java | 49 ++++++++--------------
 .../org/apache/solr/common/cloud/SolrZkClient.java |  5 ++-
 8 files changed, 61 insertions(+), 66 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 1a0319c..ad3316b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -263,7 +263,7 @@ public class SimCloudManager implements SolrCloudManager {
     this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
     this.queueFactory = new GenericDistributedQueueFactory(stateManager);
     //this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new SolrNamedThreadFactory("simCloudManagerPool"));
-    this.simCloudManagerPool = ParWork.getExecutorService(3, 10, 3);
+    this.simCloudManagerPool = ParWork.getExecutorService(10);
     this.autoScalingHandler = new AutoScalingHandler(this, loader);
 
 
@@ -607,14 +607,14 @@ public class SimCloudManager implements SolrCloudManager {
       simRemoveNode(killNodeId, false);
     }
     objectCache.clear();
-   // nocommit, oh god...
+
     try {
       simCloudManagerPool.shutdownNow();
     } catch (Exception e) {
       ParWork.propegateInterrupt(e);
       // ignore
     }
-    simCloudManagerPool = ParWork.getExecutorService(3, 10, 3);
+    simCloudManagerPool = ParWork.getExecutorService( 10);
 
     OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this);
     triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 9025490..883c148 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -346,7 +346,7 @@ public class CoreContainer implements Closeable {
     this.asyncSolrCoreLoad = asyncSolrCoreLoad;
 
     this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
-            ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 250));
+            ParWork.getExecutorService(cfg.getReplayUpdatesThreads()));
 
     metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
     String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 67edfa5..bf9c2a2 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1879,7 +1879,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
   private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
 
-  final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(0, 1, 5000);
+  final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(1);
   private AtomicInteger onDeckSearchers = new AtomicInteger();  // number of searchers preparing
   // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
   private final Object searcherLock = new Object();  // the sync object for the searcher
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java b/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
index b5b295f..a082664 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
@@ -34,7 +34,7 @@ public class PackageListeners {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String PACKAGE_VERSIONS = "PKG_VERSIONS";
-  private SolrCore core;
+  private final SolrCore core;
 
   public PackageListeners(SolrCore core) {
     this.core = core;
@@ -44,12 +44,11 @@ public class PackageListeners {
   // cause a memory leak if the listener forgets to unregister itself
   private List<Reference<Listener>> listeners = new CopyOnWriteArrayList<>();
 
-  public synchronized void addListener(Listener listener) {
+  public void addListener(Listener listener) {
     listeners.add(new SoftReference<>(listener));
-
   }
 
-  public synchronized void removeListener(Listener listener) {
+  public void removeListener(Listener listener) {
     Iterator<Reference<Listener>> it = listeners.iterator();
     while (it.hasNext()) {
       Reference<Listener> ref = it.next();
@@ -59,10 +58,9 @@ public class PackageListeners {
       }
 
     }
-
   }
 
-  synchronized void packagesUpdated(List<PackageLoader.Package> pkgs) {
+  void packagesUpdated(List<PackageLoader.Package> pkgs) {
     MDCLoggingContext.setCore(core);
     try {
       for (PackageLoader.Package pkgInfo : pkgs) {
@@ -73,7 +71,7 @@ public class PackageListeners {
     }
   }
 
-  private synchronized void invokeListeners(PackageLoader.Package pkg) {
+  private void invokeListeners(PackageLoader.Package pkg) {
     for (Reference<Listener> ref : listeners) {
       Listener listener = ref.get();
       if(listener == null) continue;
@@ -84,7 +82,7 @@ public class PackageListeners {
   }
 
   public List<Listener> getListeners() {
-    List<Listener> result = new ArrayList<>();
+    List<Listener> result = new ArrayList<>(listeners.size());
     for (Reference<Listener> ref : listeners) {
       Listener l = ref.get();
       if (l != null) {
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
index 7dcc968..d944668 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
@@ -26,6 +26,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
@@ -132,8 +133,13 @@ public class PackageLoader implements Closeable {
     Package p = packageClassLoaders.get(pkg);
     if (p != null) {
       List<Package> l = Collections.singletonList(p);
-      for (SolrCore core : coreContainer.getCores()) {
-        core.getPackageListeners().packagesUpdated(l);
+      try (ParWork work = new ParWork(this)) {
+        for (SolrCore core : coreContainer.getCores()) {
+          work.collect(() -> {
+            core.getPackageListeners().packagesUpdated(l);
+          });
+        }
+        work.collect("packageListeners");
       }
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 3ff7100..8e592ab 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -64,16 +64,19 @@ public class ParWork implements Closeable {
   protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
   private final boolean requireAnotherThread;
 
-  private Set<Object> collectSet = null;
+  private volatile Set<Object> collectSet = null;
 
   private static volatile ThreadPoolExecutor EXEC;
 
-  private synchronized static ThreadPoolExecutor getEXEC() {
+  private static ThreadPoolExecutor getEXEC() {
     if (EXEC == null) {
-      EXEC = (ThreadPoolExecutor) getParExecutorService(0,
-          Math.max(Integer.getInteger("solr.per_thread_exec.min_threads", 3), Integer.getInteger("solr.per_thread_exec.max_threads",  Runtime.getRuntime().availableProcessors() / 3)), 15000);
+      synchronized (ParWork.class) {
+        if (EXEC == null) {
+          EXEC = (ThreadPoolExecutor) getParExecutorService(5, 15000);
+        }
+      }
     }
-    return  EXEC;
+    return EXEC;
   }
 
 
@@ -243,8 +246,11 @@ public class ParWork implements Closeable {
   }
 
   public void collect(Object object) {
+    if (object == null) {
+      return;
+    }
     if (collectSet == null) {
-      collectSet = new HashSet<>(64);
+      collectSet = ConcurrentHashMap.newKeySet(32);
     }
 
     collectSet.add(object);
@@ -255,8 +261,9 @@ public class ParWork implements Closeable {
    *                 used to identify it.
    */
   public void collect(Callable<?> callable) {
+
     if (collectSet == null) {
-      collectSet = new HashSet<>();
+      collectSet = ConcurrentHashMap.newKeySet(32);
     }
     collectSet.add(callable);
   }
@@ -266,8 +273,11 @@ public class ParWork implements Closeable {
    *                 used to identify it.
    */
   public void collect(Runnable runnable) {
+    if (runnable == null) {
+      return;
+    }
     if (collectSet == null) {
-      collectSet = new HashSet<>();
+      collectSet = ConcurrentHashMap.newKeySet(32);
     }
     collectSet.add(runnable);
   }
@@ -616,7 +626,7 @@ public class ParWork implements Closeable {
       Integer maxThreads;
       minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
       maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads",  Runtime.getRuntime().availableProcessors() / 3);
-      exec = getExecutorService(0, Math.max(minThreads, maxThreads), 1); // keep alive directly affects how long a worker might
+      exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
       // be stuck in poll without an enqueue on shutdown
       THREAD_LOCAL_EXECUTOR.set(exec);
     }
@@ -624,20 +634,15 @@ public class ParWork implements Closeable {
     return exec;
   }
 
-  public static ExecutorService getParExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
+  public static ExecutorService getParExecutorService(int corePoolSize, int keepAliveTime) {
     ThreadPoolExecutor exec;
     exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(),
             corePoolSize, Integer.MAX_VALUE, keepAliveTime);
     return exec;
   }
 
-  public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
-    return new ParWorkExecService(getEXEC());
-  }
-
-  private static Integer getMaxPoolSize() {
-    return Integer.getInteger("solr.maxThreadExecPoolSize",
-            (int) Math.max(4, Math.round(Runtime.getRuntime().availableProcessors() / 3)));
+  public static ExecutorService getExecutorService(int maximumPoolSize) {
+    return new ParWorkExecService(getEXEC(), maximumPoolSize);
   }
 
   private void handleObject(String label, AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, Object object) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 6962ba1..d4c7a8f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -27,19 +27,23 @@ public class ParWorkExecService implements ExecutorService {
   private static final int MAX_AVAILABLE = Math.max(ParWork.PROC_COUNT / 2, 3);
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
 
-  private final Phaser phaser = new Phaser(1) {
-    @Override
-    protected boolean onAdvance(int phase, int parties) {
-      return false;
-    }
-  };
-
   private final ExecutorService service;
+  private final int maxSize;
   private volatile boolean terminated;
   private volatile boolean shutdown;
 
   public ParWorkExecService(ExecutorService service) {
+    this(service, -1);
+  }
+
+
+  public ParWorkExecService(ExecutorService service, int maxSize) {
     assert service != null;
+    if (maxSize == -1) {
+      this.maxSize = MAX_AVAILABLE;
+    } else {
+      this.maxSize = maxSize;
+    }
     this.service = service;
   }
 
@@ -69,19 +73,12 @@ public class ParWorkExecService implements ExecutorService {
   public boolean awaitTermination(long l, TimeUnit timeUnit)
       throws InterruptedException {
     while (available.hasQueuedThreads()) {
-      Thread.sleep(50);
+      Thread.sleep(100);
     }
     terminated = true;
     return true;
   }
 
-  public void awaitOutstanding(long l, TimeUnit timeUnit)
-      throws InterruptedException {
-    while (available.hasQueuedThreads()) {
-      Thread.sleep(50);
-    }
-  }
-
   @Override
   public <T> Future<T> submit(Callable<T> callable) {
     return doSubmit(callable, false);
@@ -102,7 +99,7 @@ public class ParWorkExecService implements ExecutorService {
           return CompletableFuture.completedFuture(callable.call());
         }
       } else {
-      //  available.acquireUninterruptibly();
+        available.acquireUninterruptibly();
       }
       Future<T> future = service.submit(callable);
       return new Future<T>() {
@@ -153,9 +150,6 @@ public class ParWorkExecService implements ExecutorService {
 
   @Override
   public <T> Future<T> submit(Runnable runnable, T t) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
     boolean success = checkLoad();
     if (success) {
       success = available.tryAcquire();
@@ -183,9 +177,6 @@ public class ParWorkExecService implements ExecutorService {
   }
 
   public Future<?> doSubmit(Runnable runnable, boolean requiresAnotherThread) {
-//    if (shutdown || terminated) {
-//      throw new RejectedExecutionException();
-//    }
     if (!requiresAnotherThread) {
       boolean success = checkLoad();
       if (success) {
@@ -196,7 +187,7 @@ public class ParWorkExecService implements ExecutorService {
         return CompletableFuture.completedFuture(null);
       }
     } else {
-     // available.acquireUninterruptibly();
+      available.acquireUninterruptibly();
     }
     Future<?> future = service.submit(runnable);
 
@@ -248,9 +239,7 @@ public class ParWorkExecService implements ExecutorService {
   public <T> List<Future<T>> invokeAll(
       Collection<? extends Callable<T>> collection)
       throws InterruptedException {
-//    if (shutdown || terminated) {
-//      throw new RejectedExecutionException();
-//    }
+
     List<Future<T>> futures = new ArrayList<>(collection.size());
     for (Callable c : collection) {
       futures.add(submit(c));
@@ -295,11 +284,7 @@ public class ParWorkExecService implements ExecutorService {
       success = available.tryAcquire();
     }
     if (!success) {
-      try {
-        runnable.run();
-      } finally {
-        available.release();
-      }
+      runnable.run();
       return;
     }
     service.execute(new Runnable() {
@@ -316,7 +301,7 @@ public class ParWorkExecService implements ExecutorService {
   }
 
   public Integer getMaximumPoolSize() {
-    return MAX_AVAILABLE;
+    return maxSize;
   }
 
   public boolean checkLoad() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index dbacac6..10b3234 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -19,6 +19,7 @@ package org.apache.solr.common.cloud;
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
 import org.apache.solr.common.ParWorkExecutor;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
@@ -96,9 +97,9 @@ public class SolrZkClient implements Closeable {
 
   private final ConnectionManager connManager;
 
-  private final ExecutorService zkCallbackExecutor = new ParWorkExecutor("ZkCallback", 1);
+  private final ExecutorService zkCallbackExecutor = ParWork.getExecutorService( 1);
 
-  private final ExecutorService zkConnManagerCallbackExecutor =  new ParWorkExecutor("zkConnectionManagerCallback", 1);
+  private final ExecutorService zkConnManagerCallbackExecutor = ParWork.getExecutorService( 1);
 
   private volatile boolean isClosed = false;