You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/28 16:58:18 UTC
[lucene-solr] 01/03: @868 Thread management 2.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 28f6f7d597a30e37e31aa4f11f1585864becbe04
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Sep 27 15:36:21 2020 -0500
@868 Thread management 2.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 9 +-
.../apache/solr/cloud/OverseerElectionContext.java | 6 +-
.../org/apache/solr/metrics/SolrMetricManager.java | 11 +-
.../org/apache/solr/search/TestRealTimeGet.java | 4 -
.../src/java/org/apache/solr/common/ParWork.java | 94 ++++++++-----
.../org/apache/solr/common/ParWorkExecutor.java | 12 +-
.../apache/solr/common/PerThreadExecService.java | 152 ++-------------------
.../java/org/apache/solr/common/SolrThread.java | 51 +++++++
.../org/apache/solr/common/util/JavaBinCodec.java | 4 +-
.../solr/common/util/SolrQueuedThreadPool.java | 13 +-
.../src/java/org/apache/solr/SolrTestCase.java | 8 +-
11 files changed, 157 insertions(+), 207 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index ca3fe73..e44fba3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -53,6 +53,7 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DocCollection;
@@ -601,7 +602,7 @@ public class Overseer implements SolrCloseable {
}
}
- public static class OverseerThread extends Thread implements Closeable {
+ public static class OverseerThread extends SolrThread implements Closeable {
protected volatile boolean isClosed;
private final Closeable thread;
@@ -613,11 +614,7 @@ public class Overseer implements SolrCloseable {
@Override
public void run() {
- try {
- super.run();
- } finally {
- //ParWork.closeMyPerThreadExecutor(true);
- }
+ super.run();
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 1ec1e23..f36e203 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -49,7 +49,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
@Override
void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException, IOException {
- if (isClosed() || !zkClient.isConnected() || overseer.isDone()) {
+ if (isClosed() || overseer.isDone()) {
return;
}
@@ -77,7 +77,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
log.info("Bailing on becoming leader, we are closed");
return;
}
- if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
+ if (!isClosed() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
try {
overseer.start(id, context);
} finally {
@@ -165,7 +165,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
@Override
public boolean isClosed() {
- return isClosed || overseer.getCoreContainer().isShutDown() || zkClient.isClosed() || overseer.getCoreContainer().getZkController().isClosed();
+ return isClosed || !zkClient.isConnected();
}
}
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index c20896b..77257c2 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -101,6 +101,7 @@ public class SolrMetricManager {
* system properties. This registry is shared between instances of {@link SolrMetricManager}.
*/
public static final String JVM_REGISTRY = REGISTRY_NAME_PREFIX + SolrInfoBean.Group.jvm.toString();
+ public static final PluginInfo[] PLUGIN_INFOS_EMPTY = new PluginInfo[0];
private final ConcurrentMap<String, MetricRegistry> registries = new ConcurrentHashMap<>(32);
@@ -568,9 +569,9 @@ public class SolrMetricManager {
*/
public void registerAll(String registry, MetricSet metrics, boolean force, String... metricPath) {
MetricRegistry metricRegistry = registry(registry);
- try (ParWork work = new ParWork(this)) {
+ // try (ParWork work = new ParWork(this)) {
for (Map.Entry<String,Metric> entry : metrics.getMetrics().entrySet()) {
- work.collect("registerMetric-" + entry.getKey(), () ->{
+ // work.collect("registerMetric-" + entry.getKey(), () ->{
String fullName = mkName(entry.getKey(), metricPath);
try {
metricRegistry.register(fullName, entry.getValue());
@@ -582,9 +583,9 @@ public class SolrMetricManager {
log.warn("Metric already registered: " + fullName);
}
}
- });
+ // });
}
- }
+ // }
}
/**
@@ -1249,7 +1250,7 @@ public class SolrMetricManager {
Map<String, Object> defaultInitArgs) {
List<PluginInfo> result = new ArrayList<>();
if (pluginInfos == null) {
- pluginInfos = new PluginInfo[0];
+ pluginInfos = PLUGIN_INFOS_EMPTY;
}
for (PluginInfo info : pluginInfos) {
String groupAttr = info.attributes.get("group");
diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
index 2a05b4b..2748ac5 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
@@ -686,8 +686,6 @@ public class TestRealTimeGet extends TestRTGBase {
} catch (Throwable e) {
operations.set(-1L);
throw new RuntimeException(e);
- } finally {
- ParWork.closeMyPerThreadExecutor();
}
}
};
@@ -765,8 +763,6 @@ public class TestRealTimeGet extends TestRTGBase {
} catch (Throwable e) {
operations.set(-1L);
throw new RuntimeException(e);
- } finally {
- ParWork.closeMyPerThreadExecutor();
}
}
};
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index fc1bd2a..0efc22c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -48,6 +48,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.OrderedExecutor;
import org.apache.solr.common.util.SysStats;
import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +79,9 @@ public class ParWork implements Closeable {
if (EXEC == null) {
synchronized (ParWork.class) {
if (EXEC == null) {
- EXEC = (ThreadPoolExecutor) getParExecutorService("RootExec", Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 30000, new SynchronousQueue<>());
+ EXEC = (ThreadPoolExecutor) getParExecutorService("RootExec",
+ Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 30000,
+ new BlockingArrayQueue(1024));
((ParWorkExecutor)EXEC).enableCloseLock();
}
}
@@ -111,21 +114,6 @@ public class ParWork implements Closeable {
return sysStats;
}
- public static void closeMyPerThreadExecutor() {
- closeMyPerThreadExecutor(false);
- }
-
- public static void closeMyPerThreadExecutor(boolean unlockClose) {
- PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
- if (exec != null) {
- if (unlockClose) {
- exec.closeLock(false);
- }
- ExecutorUtil.shutdownAndAwaitTermination(exec);
- THREAD_LOCAL_EXECUTOR.set(null);
- }
- }
-
private static class WorkUnit {
private final Set<ParObject> objects;
private final TimeTracker tracker;
@@ -485,24 +473,33 @@ public class ParWork implements Closeable {
}
public static ExecutorService getMyPerThreadExecutor() {
- // if (executor != null) return executor;
- ExecutorService exec = THREAD_LOCAL_EXECUTOR.get();
- if (exec == null) {
- if (log.isDebugEnabled()) {
- log.debug("Starting a new executor");
- }
+ Thread thread = Thread.currentThread();
- Integer minThreads;
- Integer maxThreads;
- minThreads = 3;
- maxThreads = PROC_COUNT;
- exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
- ((PerThreadExecService)exec).closeLock(true);
- // be stuck in poll without an enqueue on shutdown
- THREAD_LOCAL_EXECUTOR.set(exec);
+ ExecutorService service = null;
+ if (thread instanceof SolrThread) {
+ service = ((SolrThread) thread).getExecutorService();
}
- return exec;
+ if (service == null) {
+ ExecutorService exec = THREAD_LOCAL_EXECUTOR.get();
+ if (exec == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting a new executor");
+ }
+
+ Integer minThreads;
+ Integer maxThreads;
+ minThreads = 4;
+ maxThreads = PROC_COUNT / 2;
+ exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
+ // ((PerThreadExecService)exec).closeLock(true);
+ // be stuck in poll without an enqueue on shutdown
+ THREAD_LOCAL_EXECUTOR.set(exec);
+ }
+ service = exec;
+ }
+
+ return service;
}
public static ExecutorService getParExecutorService(String name, int corePoolSize, int maxPoolSize, int keepAliveTime, BlockingQueue queue) {
@@ -703,13 +700,44 @@ public class ParWork implements Closeable {
public abstract Object call() throws Exception;
}
- public static class SolrFutureTask extends FutureTask {
- public SolrFutureTask(Callable callable) {
+ public static class SolrFutureTask extends FutureTask implements SolrThread.CreateThread {
+
+ private final boolean callerThreadAllowed;
+ private final SolrThread createThread;
+
+ public SolrFutureTask(Callable callable, boolean callerThreadAllowed) {
super(callable);
+ this.callerThreadAllowed = callerThreadAllowed;
+ Thread thread = Thread.currentThread();
+ if (thread instanceof SolrThread) {
+ this.createThread = (SolrThread) Thread.currentThread();
+ } else {
+ this.createThread = null;
+ }
}
public SolrFutureTask(Runnable runnable, Object value) {
+ this(runnable, value, true);
+ }
+
+ public SolrFutureTask(Runnable runnable, Object value, boolean callerThreadAllowed) {
super(runnable, value);
+ this.callerThreadAllowed = callerThreadAllowed;
+ Thread thread = Thread.currentThread();
+ if (thread instanceof SolrThread) {
+ this.createThread = (SolrThread) Thread.currentThread();
+ } else {
+ this.createThread = null;
+ }
+ }
+
+ public boolean isCallerThreadAllowed() {
+ return callerThreadAllowed;
+ }
+
+ @Override
+ public SolrThread getCreateThread() {
+ return createThread;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index f3768d1..8383e76 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -91,16 +91,8 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
- Thread t = new Thread(group,
- name + threadNumber.getAndIncrement()) {
- public void run() {
- try {
- r.run();
- } finally {
- // ParWork.closeMyPerThreadExecutor(true);
- }
- }
- };
+ SolrThread t = new SolrThread(group, r,
+ name + threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index f8c175d..c913269 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -39,74 +39,11 @@ public class PerThreadExecService extends AbstractExecutorService {
private final Object awaitTerminate = new Object();
-// private final BlockingArrayQueue<Runnable> workQueue = new BlockingArrayQueue<>(30, 0);
-// private volatile Worker worker;
-// private volatile Future<?> workerFuture;
-
private CloseTracker closeTracker;
private SysStats sysStats = ParWork.getSysStats();
private volatile boolean closeLock;
-// private class Worker implements Runnable {
-//
-// Worker() {
-// // setName("ParExecWorker");
-// }
-//
-// @Override
-// public void run() {
-// while (!terminated && !Thread.currentThread().isInterrupted()) {
-// Runnable runnable = null;
-// try {
-// runnable = workQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
-// } catch (InterruptedException e) {
-// ParWork.propagateInterrupt(e);
-// return;
-// }
-// if (runnable == null) {
-// running.decrementAndGet();
-// synchronized (awaitTerminate) {
-// awaitTerminate.notifyAll();
-// }
-// return;
-// }
-//
-// if (runnable instanceof ParWork.SolrFutureTask) {
-//
-// } else {
-//
-// try {
-// boolean success = available.tryAcquire();
-// // I think if we wait here for available instead of running in caller thread
-// // this is why we could not use the per thread executor in the stream classes
-// // this means order cannot matter, but it should generally not matter
-// if (!success) {
-// runIt(runnable, true, true, false);
-// return;
-// }
-// } catch (Exception e) {
-// ParWork.propagateInterrupt(e);
-// running.decrementAndGet();
-// synchronized (awaitTerminate) {
-// awaitTerminate.notifyAll();
-// }
-// return;
-// }
-//
-// }
-//
-// Runnable finalRunnable = runnable;
-// service.execute(new Runnable() {
-// @Override
-// public void run() {
-// runIt(finalRunnable, true, false, false);
-// }
-// });
-// }
-// }
-// }
-
public PerThreadExecService(ExecutorService service) {
this(service, -1);
}
@@ -120,7 +57,6 @@ public class PerThreadExecService extends AbstractExecutorService {
assert (closeTracker = new CloseTracker()) != null;
this.noCallerRunsAllowed = noCallerRunsAllowed;
this.noCallerRunsAvailableLimit = noCallerRunsAvailableLimit;
- //assert ObjectReleaseTracker.track(this);
if (maxSize == -1) {
this.maxSize = MAX_AVAILABLE;
} else {
@@ -132,55 +68,26 @@ public class PerThreadExecService extends AbstractExecutorService {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (noCallerRunsAllowed) {
- return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
+ return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value, false);
}
- return new FutureTask(runnable, value);
-
+ return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (noCallerRunsAllowed || callable instanceof ParWork.NoLimitsCallable) {
- return (RunnableFuture) new ParWork.SolrFutureTask(callable);
+ return (RunnableFuture) new ParWork.SolrFutureTask(callable, false);
}
- return new FutureTask(callable);
+ return (RunnableFuture) new ParWork.SolrFutureTask(callable, true);
}
@Override
public void shutdown() {
- if (closeLock) {
- throw new IllegalCallerException();
- }
+// if (closeLock) {
+// throw new IllegalCallerException();
+// }
assert ObjectReleaseTracker.release(this);
- // assert closeTracker.close();
this.shutdown = true;
- // worker.interrupt();
- // workQueue.clear();
-// try {
-// workQueue.offer(new Runnable() {
-// @Override
-// public void run() {
-// // noop to wake from take
-// }
-// });
-// workQueue.offer(new Runnable() {
-// @Override
-// public void run() {
-// // noop to wake from take
-// }
-// });
-// workQueue.offer(new Runnable() {
-// @Override
-// public void run() {
-// // noop to wake from take
-// }
-// });
-
-
- // workerFuture.cancel(true);
-// } catch (NullPointerException e) {
-// // okay
-// }
}
@Override
@@ -208,7 +115,7 @@ public class PerThreadExecService extends AbstractExecutorService {
throw new RuntimeException("Timeout");
}
- //zaa System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
+ // System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
synchronized (awaitTerminate) {
awaitTerminate.wait(500);
}
@@ -228,7 +135,7 @@ public class PerThreadExecService extends AbstractExecutorService {
throw new RejectedExecutionException(closeTracker.getCloseStack());
}
running.incrementAndGet();
- if (runnable instanceof ParWork.SolrFutureTask) {
+ if (runnable instanceof ParWork.SolrFutureTask && !((ParWork.SolrFutureTask) runnable).isCallerThreadAllowed()) {
if (noCallerRunsAvailableLimit) {
try {
available.acquire();
@@ -237,13 +144,10 @@ public class PerThreadExecService extends AbstractExecutorService {
}
}
try {
- service.execute(new Runnable() {
- @Override
- public void run() {
- runIt(runnable, noCallerRunsAvailableLimit, false);
- if (noCallerRunsAvailableLimit) {
- available.release();
- }
+ service.execute(() -> {
+ runIt(runnable, noCallerRunsAvailableLimit, false);
+ if (noCallerRunsAvailableLimit) {
+ available.release();
}
});
} catch (Exception e) {
@@ -270,41 +174,13 @@ public class PerThreadExecService extends AbstractExecutorService {
Runnable finalRunnable = runnable;
try {
- service.execute(new Runnable() {
- @Override
- public void run() {
- runIt(finalRunnable, true, false);
- }
- });
+ service.execute(() -> runIt(finalRunnable, true, false));
} catch (Exception e) {
running.decrementAndGet();
synchronized (awaitTerminate) {
awaitTerminate.notifyAll();
}
}
-
-// boolean success = this.workQueue.offer(runnable);
-// if (!success) {
-// // log.warn("No room in the queue, running in caller thread {} {} {} {}", workQueue.size(), isShutdown(), isTerminated(), worker.isAlive());
-// try {
-// runnable.run();
-// } finally {
-// running.decrementAndGet();
-// synchronized (awaitTerminate) {
-// awaitTerminate.notifyAll();
-// }
-// }
-// } else {
-// if (worker == null) {
-// synchronized (this) {
-// if (worker == null) {
-// worker = new Worker();
-//
-// workerFuture = ParWork.getEXEC().submit(worker);
-// }
-// }
-// }
-// }
}
private void runIt(Runnable runnable, boolean acquired, boolean alreadyShutdown) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrThread.java b/solr/solrj/src/java/org/apache/solr/common/SolrThread.java
new file mode 100644
index 0000000..998a0e6
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/SolrThread.java
@@ -0,0 +1,51 @@
+package org.apache.solr.common;
+
+import java.util.concurrent.ExecutorService;
+
+public class SolrThread extends Thread {
+
+ private ExecutorService executorService;
+
+ public SolrThread(ThreadGroup group, Runnable r, String name) {
+ super(group, r, name);
+
+ Thread createThread = Thread.currentThread();
+ if (createThread instanceof SolrThread) {
+ ExecutorService service = ((SolrThread) createThread).getExecutorService();
+ if (service == null) {
+ createExecutorService();
+ } else {
+ setExecutorService(service);
+ }
+ }
+
+ }
+
+ public void run() {
+ super.run();
+ }
+
+ private void setExecutorService(ExecutorService service) {
+ this.executorService = service;
+ }
+
+ private void createExecutorService() {
+ Integer minThreads;
+ Integer maxThreads;
+ minThreads = 4;
+ maxThreads = ParWork.PROC_COUNT / 2;
+ this.executorService = ParWork.getExecutorService(Math.max(minThreads, maxThreads));
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public static SolrThread getCurrentThread() {
+ return (SolrThread) currentThread();
+ }
+
+ public interface CreateThread {
+ SolrThread getCreateThread();
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
index 901baa9..9d1c08b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
@@ -920,8 +920,8 @@ public class JavaBinCodec implements PushWriter {
}
}
- protected final static ThreadLocal<CharArr> THREAD_LOCAL_ARR = new ThreadLocal<>();
- protected final static ThreadLocal<ByteBuffer> THREAD_LOCAL_BRR = new ThreadLocal<>();
+ public final static ThreadLocal<CharArr> THREAD_LOCAL_ARR = new ThreadLocal<>();
+ public final static ThreadLocal<ByteBuffer> THREAD_LOCAL_BRR = new ThreadLocal<>();
public static ByteBuffer getByteArr(int sz, boolean resize) {
ByteBuffer brr = THREAD_LOCAL_BRR.get();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 92b45c6..a200b6f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -98,7 +98,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
public SolrQueuedThreadPool(String name) {
this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 250),
- 5000, 0, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
+ 30000, 0, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
null, -1, null,
new SolrNamedThreadFactory(name));
this.name = name;
@@ -714,7 +714,16 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
@Override
public void run() {
- runnable.run();
+ try {
+ runnable.run();
+ } finally {
+ cleanupThreadLocals();
+ }
+ }
+
+ private void cleanupThreadLocals() {
+ JavaBinCodec.THREAD_LOCAL_ARR.remove();
+ JavaBinCodec.THREAD_LOCAL_BRR.remove();
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 2dc1e15..a4c3c78 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -195,7 +195,7 @@ public class SolrTestCase extends LuceneTestCase {
testStartTime = System.nanoTime();
- testExecutor = ParWork.getMyPerThreadExecutor();
+ testExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), 12, true, false);
((PerThreadExecService) testExecutor).closeLock(true);
// stop zkserver threads that can linger
//interruptThreadsOnTearDown("nioEventLoopGroup", false);
@@ -285,9 +285,9 @@ public class SolrTestCase extends LuceneTestCase {
// unlimited - System.setProperty("solr.maxContainerThreads", "300");
System.setProperty("solr.lowContainerThreadsThreshold", "-1");
- System.setProperty("solr.minContainerThreads", "4");
+ System.setProperty("solr.minContainerThreads", "8");
System.setProperty("solr.rootSharedThreadPoolCoreSize", "16");
- System.setProperty("solr.minHttp2ClientThreads", "4");
+ System.setProperty("solr.minHttp2ClientThreads", "6");
ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS = 1;
@@ -436,7 +436,7 @@ public class SolrTestCase extends LuceneTestCase {
SysStats.getSysStats().stopMonitor();
- //ParWork.closeMyPerThreadExecutor(true);
+ // testExecutor.shutdown();
ParWork.shutdownRootSharedExec();
AlreadyClosedException lastAlreadyClosedExp = CloseTracker.lastAlreadyClosedEx;