You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 06:07:28 UTC
[lucene-solr] 02/04: @494 Clean up virtual exec a bit.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit dd62d54c2be96b1f0cdab7ea2cc395ad0d4fa1a9
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 10 23:31:21 2020 -0500
@494 Clean up virtual exec a bit.
---
.../cloud/autoscaling/sim/SimCloudManager.java | 6 +--
.../java/org/apache/solr/core/CoreContainer.java | 2 +-
.../src/java/org/apache/solr/core/SolrCore.java | 2 +-
.../java/org/apache/solr/pkg/PackageListeners.java | 14 +++----
.../java/org/apache/solr/pkg/PackageLoader.java | 10 ++++-
.../src/java/org/apache/solr/common/ParWork.java | 39 +++++++++--------
.../org/apache/solr/common/ParWorkExecService.java | 49 ++++++++--------------
.../org/apache/solr/common/cloud/SolrZkClient.java | 5 ++-
8 files changed, 61 insertions(+), 66 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 1a0319c..ad3316b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -263,7 +263,7 @@ public class SimCloudManager implements SolrCloudManager {
this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
this.queueFactory = new GenericDistributedQueueFactory(stateManager);
//this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new SolrNamedThreadFactory("simCloudManagerPool"));
- this.simCloudManagerPool = ParWork.getExecutorService(3, 10, 3);
+ this.simCloudManagerPool = ParWork.getExecutorService(10);
this.autoScalingHandler = new AutoScalingHandler(this, loader);
@@ -607,14 +607,14 @@ public class SimCloudManager implements SolrCloudManager {
simRemoveNode(killNodeId, false);
}
objectCache.clear();
- // nocommit, oh god...
+
try {
simCloudManagerPool.shutdownNow();
} catch (Exception e) {
ParWork.propegateInterrupt(e);
// ignore
}
- simCloudManagerPool = ParWork.getExecutorService(3, 10, 3);
+ simCloudManagerPool = ParWork.getExecutorService( 10);
OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this);
triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 9025490..883c148 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -346,7 +346,7 @@ public class CoreContainer implements Closeable {
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
- ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 250));
+ ParWork.getExecutorService(cfg.getReplayUpdatesThreads()));
metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 67edfa5..bf9c2a2 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1879,7 +1879,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
- final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(0, 1, 5000);
+ final ParWorkExecService searcherExecutor = (ParWorkExecService) ParWork.getExecutorService(1);
private AtomicInteger onDeckSearchers = new AtomicInteger(); // number of searchers preparing
// Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
private final Object searcherLock = new Object(); // the sync object for the searcher
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java b/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
index b5b295f..a082664 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageListeners.java
@@ -34,7 +34,7 @@ public class PackageListeners {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String PACKAGE_VERSIONS = "PKG_VERSIONS";
- private SolrCore core;
+ private final SolrCore core;
public PackageListeners(SolrCore core) {
this.core = core;
@@ -44,12 +44,11 @@ public class PackageListeners {
// cause a memory leak if the listener forgets to unregister itself
private List<Reference<Listener>> listeners = new CopyOnWriteArrayList<>();
- public synchronized void addListener(Listener listener) {
+ public void addListener(Listener listener) {
listeners.add(new SoftReference<>(listener));
-
}
- public synchronized void removeListener(Listener listener) {
+ public void removeListener(Listener listener) {
Iterator<Reference<Listener>> it = listeners.iterator();
while (it.hasNext()) {
Reference<Listener> ref = it.next();
@@ -59,10 +58,9 @@ public class PackageListeners {
}
}
-
}
- synchronized void packagesUpdated(List<PackageLoader.Package> pkgs) {
+ void packagesUpdated(List<PackageLoader.Package> pkgs) {
MDCLoggingContext.setCore(core);
try {
for (PackageLoader.Package pkgInfo : pkgs) {
@@ -73,7 +71,7 @@ public class PackageListeners {
}
}
- private synchronized void invokeListeners(PackageLoader.Package pkg) {
+ private void invokeListeners(PackageLoader.Package pkg) {
for (Reference<Listener> ref : listeners) {
Listener listener = ref.get();
if(listener == null) continue;
@@ -84,7 +82,7 @@ public class PackageListeners {
}
public List<Listener> getListeners() {
- List<Listener> result = new ArrayList<>();
+ List<Listener> result = new ArrayList<>(listeners.size());
for (Reference<Listener> ref : listeners) {
Listener l = ref.get();
if (l != null) {
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
index 7dcc968..d944668 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.ParWork;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
@@ -132,8 +133,13 @@ public class PackageLoader implements Closeable {
Package p = packageClassLoaders.get(pkg);
if (p != null) {
List<Package> l = Collections.singletonList(p);
- for (SolrCore core : coreContainer.getCores()) {
- core.getPackageListeners().packagesUpdated(l);
+ try (ParWork work = new ParWork(this)) {
+ for (SolrCore core : coreContainer.getCores()) {
+ work.collect(() -> {
+ core.getPackageListeners().packagesUpdated(l);
+ });
+ }
+ work.collect("packageListeners");
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 3ff7100..8e592ab 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -64,16 +64,19 @@ public class ParWork implements Closeable {
protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
private final boolean requireAnotherThread;
- private Set<Object> collectSet = null;
+ private volatile Set<Object> collectSet = null;
private static volatile ThreadPoolExecutor EXEC;
- private synchronized static ThreadPoolExecutor getEXEC() {
+ private static ThreadPoolExecutor getEXEC() {
if (EXEC == null) {
- EXEC = (ThreadPoolExecutor) getParExecutorService(0,
- Math.max(Integer.getInteger("solr.per_thread_exec.min_threads", 3), Integer.getInteger("solr.per_thread_exec.max_threads", Runtime.getRuntime().availableProcessors() / 3)), 15000);
+ synchronized (ParWork.class) {
+ if (EXEC == null) {
+ EXEC = (ThreadPoolExecutor) getParExecutorService(5, 15000);
+ }
+ }
}
- return EXEC;
+ return EXEC;
}
@@ -243,8 +246,11 @@ public class ParWork implements Closeable {
}
public void collect(Object object) {
+ if (object == null) {
+ return;
+ }
if (collectSet == null) {
- collectSet = new HashSet<>(64);
+ collectSet = ConcurrentHashMap.newKeySet(32);
}
collectSet.add(object);
@@ -255,8 +261,9 @@ public class ParWork implements Closeable {
* used to identify it.
*/
public void collect(Callable<?> callable) {
+
if (collectSet == null) {
- collectSet = new HashSet<>();
+ collectSet = ConcurrentHashMap.newKeySet(32);
}
collectSet.add(callable);
}
@@ -266,8 +273,11 @@ public class ParWork implements Closeable {
* used to identify it.
*/
public void collect(Runnable runnable) {
+ if (runnable == null) {
+ return;
+ }
if (collectSet == null) {
- collectSet = new HashSet<>();
+ collectSet = ConcurrentHashMap.newKeySet(32);
}
collectSet.add(runnable);
}
@@ -616,7 +626,7 @@ public class ParWork implements Closeable {
Integer maxThreads;
minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads", Runtime.getRuntime().availableProcessors() / 3);
- exec = getExecutorService(0, Math.max(minThreads, maxThreads), 1); // keep alive directly affects how long a worker might
+ exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
// be stuck in poll without an enqueue on shutdown
THREAD_LOCAL_EXECUTOR.set(exec);
}
@@ -624,20 +634,15 @@ public class ParWork implements Closeable {
return exec;
}
- public static ExecutorService getParExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
+ public static ExecutorService getParExecutorService(int corePoolSize, int keepAliveTime) {
ThreadPoolExecutor exec;
exec = new ParWorkExecutor("ParWork-" + Thread.currentThread().getName(),
corePoolSize, Integer.MAX_VALUE, keepAliveTime);
return exec;
}
- public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
- return new ParWorkExecService(getEXEC());
- }
-
- private static Integer getMaxPoolSize() {
- return Integer.getInteger("solr.maxThreadExecPoolSize",
- (int) Math.max(4, Math.round(Runtime.getRuntime().availableProcessors() / 3)));
+ public static ExecutorService getExecutorService(int maximumPoolSize) {
+ return new ParWorkExecService(getEXEC(), maximumPoolSize);
}
private void handleObject(String label, AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, Object object) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 6962ba1..d4c7a8f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -27,19 +27,23 @@ public class ParWorkExecService implements ExecutorService {
private static final int MAX_AVAILABLE = Math.max(ParWork.PROC_COUNT / 2, 3);
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- private final Phaser phaser = new Phaser(1) {
- @Override
- protected boolean onAdvance(int phase, int parties) {
- return false;
- }
- };
-
private final ExecutorService service;
+ private final int maxSize;
private volatile boolean terminated;
private volatile boolean shutdown;
public ParWorkExecService(ExecutorService service) {
+ this(service, -1);
+ }
+
+
+ public ParWorkExecService(ExecutorService service, int maxSize) {
assert service != null;
+ if (maxSize == -1) {
+ this.maxSize = MAX_AVAILABLE;
+ } else {
+ this.maxSize = maxSize;
+ }
this.service = service;
}
@@ -69,19 +73,12 @@ public class ParWorkExecService implements ExecutorService {
public boolean awaitTermination(long l, TimeUnit timeUnit)
throws InterruptedException {
while (available.hasQueuedThreads()) {
- Thread.sleep(50);
+ Thread.sleep(100);
}
terminated = true;
return true;
}
- public void awaitOutstanding(long l, TimeUnit timeUnit)
- throws InterruptedException {
- while (available.hasQueuedThreads()) {
- Thread.sleep(50);
- }
- }
-
@Override
public <T> Future<T> submit(Callable<T> callable) {
return doSubmit(callable, false);
@@ -102,7 +99,7 @@ public class ParWorkExecService implements ExecutorService {
return CompletableFuture.completedFuture(callable.call());
}
} else {
- // available.acquireUninterruptibly();
+ available.acquireUninterruptibly();
}
Future<T> future = service.submit(callable);
return new Future<T>() {
@@ -153,9 +150,6 @@ public class ParWorkExecService implements ExecutorService {
@Override
public <T> Future<T> submit(Runnable runnable, T t) {
- if (shutdown || terminated) {
- throw new RejectedExecutionException();
- }
boolean success = checkLoad();
if (success) {
success = available.tryAcquire();
@@ -183,9 +177,6 @@ public class ParWorkExecService implements ExecutorService {
}
public Future<?> doSubmit(Runnable runnable, boolean requiresAnotherThread) {
-// if (shutdown || terminated) {
-// throw new RejectedExecutionException();
-// }
if (!requiresAnotherThread) {
boolean success = checkLoad();
if (success) {
@@ -196,7 +187,7 @@ public class ParWorkExecService implements ExecutorService {
return CompletableFuture.completedFuture(null);
}
} else {
- // available.acquireUninterruptibly();
+ available.acquireUninterruptibly();
}
Future<?> future = service.submit(runnable);
@@ -248,9 +239,7 @@ public class ParWorkExecService implements ExecutorService {
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> collection)
throws InterruptedException {
-// if (shutdown || terminated) {
-// throw new RejectedExecutionException();
-// }
+
List<Future<T>> futures = new ArrayList<>(collection.size());
for (Callable c : collection) {
futures.add(submit(c));
@@ -295,11 +284,7 @@ public class ParWorkExecService implements ExecutorService {
success = available.tryAcquire();
}
if (!success) {
- try {
- runnable.run();
- } finally {
- available.release();
- }
+ runnable.run();
return;
}
service.execute(new Runnable() {
@@ -316,7 +301,7 @@ public class ParWorkExecService implements ExecutorService {
}
public Integer getMaximumPoolSize() {
- return MAX_AVAILABLE;
+ return maxSize;
}
public boolean checkLoad() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index dbacac6..10b3234 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -19,6 +19,7 @@ package org.apache.solr.common.cloud;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
import org.apache.solr.common.ParWorkExecutor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
@@ -96,9 +97,9 @@ public class SolrZkClient implements Closeable {
private final ConnectionManager connManager;
- private final ExecutorService zkCallbackExecutor = new ParWorkExecutor("ZkCallback", 1);
+ private final ExecutorService zkCallbackExecutor = ParWork.getExecutorService( 1);
- private final ExecutorService zkConnManagerCallbackExecutor = new ParWorkExecutor("zkConnectionManagerCallback", 1);
+ private final ExecutorService zkConnManagerCallbackExecutor = ParWork.getExecutorService( 1);
private volatile boolean isClosed = false;