You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/01 14:13:29 UTC
[lucene-solr] branch reference_impl_dev updated: @665 Executors,
a place for everything and everything in it's place.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 470aa15 @665 Executors, a place for everything and everything in it's place.
470aa15 is described below
commit 470aa15f5dd75f9f04ee06e0c468d220ba0fc91f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Sep 1 09:12:53 2020 -0500
@665 Executors, a place for everything and everything in it's place.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../org/apache/solr/search/TestRealTimeGet.java | 4 +-
.../src/java/org/apache/solr/common/ParWork.java | 8 +--
.../org/apache/solr/common/ParWorkExecutor.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 3 +-
.../org/apache/solr/common/util/ExecutorUtil.java | 6 ++
.../solr/common/util/SolrQueuedThreadPool.java | 76 ++++++++++++++--------
.../solrj/impl/CloudHttp2SolrClientTest.java | 3 +-
.../src/java/org/apache/solr/SolrTestCase.java | 14 ++--
.../solr/cloud/AbstractFullDistribZkTestBase.java | 14 ----
.../org/apache/solr/cloud/SolrCloudTestCase.java | 5 --
.../java/org/apache/solr/cloud/ZkTestServer.java | 14 +---
12 files changed, 75 insertions(+), 76 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e1d6a56..22c176a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -555,7 +555,7 @@ public class Overseer implements SolrCloseable {
try {
super.run();
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
}
diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
index 527f027..bf8d43f 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
@@ -685,7 +685,7 @@ public class TestRealTimeGet extends TestRTGBase {
operations.set(-1L);
throw new RuntimeException(e);
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
}
};
@@ -764,7 +764,7 @@ public class TestRealTimeGet extends TestRTGBase {
operations.set(-1L);
throw new RuntimeException(e);
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
}
};
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 7b99b45..e16bf5f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -85,7 +85,7 @@ public class ParWork implements Closeable {
}
- public static void shutdownExec() {
+ public static void shutdownRootSharedExec() {
synchronized (ParWork.class) {
if (EXEC != null) {
((ParWorkExecutor)EXEC).closeLock(false);
@@ -105,11 +105,11 @@ public class ParWork implements Closeable {
return sysStats;
}
- public static void closeExecutor() {
- closeExecutor(true);
+ public static void closeMyPerThreadExecutor() {
+ closeMyPerThreadExecutor(true);
}
- public static void closeExecutor(boolean unlockClose) {
+ public static void closeMyPerThreadExecutor(boolean unlockClose) {
PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
if (exec != null) {
if (unlockClose) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index df6ad7f..22d5d17 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -95,7 +95,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
try {
r.run();
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
}
};
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index d1a54bb..94da4d3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.zookeeper.Watcher.Event.KeeperState.AuthFailed;
import static org.apache.zookeeper.Watcher.Event.KeeperState.Disconnected;
import static org.apache.zookeeper.Watcher.Event.KeeperState.Expired;
-import static org.apache.zookeeper.Watcher.Event.KeeperState.fromInt;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -331,7 +330,7 @@ public class ConnectionManager implements Watcher, Closeable {
} while (!isClosed() || Thread.currentThread().isInterrupted());
} finally {
ParWork
- .closeExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
+ .closeMyPerThreadExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
}
log.info("zkClient Connected: {}", connected);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index 369d4b4..0eeb9e7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -33,6 +33,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -81,8 +82,13 @@ public class ExecutorUtil {
boolean shutdown = false;
// if interrupted, we still wait a short time for thread stoppage, but then quickly bail
TimeOut interruptTimeout = new TimeOut(3000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ TimeOut shutdownTimeout = new TimeOut(30000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
boolean interrupted = false;
do {
+ if (shutdownTimeout.hasTimedOut()) {
+ throw new RuntimeException("Timeout waiting for executor to shutdown");
+ }
+
try {
// Wait a while for existing tasks to terminate
shutdown = pool.awaitTermination(30, TimeUnit.SECONDS);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 934fd5c..6bf7538 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -713,16 +713,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
- Thread thread = new Thread(group, "") {
- @Override
- public void run() {
- try {
- runnable.run();
- } finally {
- ParWork.closeExecutor();
- }
- }
- };
+ Thread thread = new MyThread(group, runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
@@ -772,23 +763,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
if (isDetailedDump())
{
- threads.add(new Dumpable()
- {
- @Override
- public void dump(Appendable out, String indent) throws IOException
- {
- if (StringUtil.isBlank(known))
- Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace);
- else
- Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
- }
-
- @Override
- public String dump()
- {
- return null;
- }
- });
+ threads.add(new MyDumpable(known, thread, trace));
}
else
{
@@ -845,7 +820,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
log.error("Error in Jetty thread pool thread", error);
this.error = error;
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
synchronized (notify) {
@@ -912,6 +887,51 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
return null;
}
+ private static class MyThread extends Thread {
+ private final Runnable runnable;
+
+ public MyThread(ThreadGroup group, Runnable runnable) {
+ super(group, "");
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ runnable.run();
+ } finally {
+ ParWork.closeMyPerThreadExecutor();
+ }
+ }
+ }
+
+ private static class MyDumpable implements Dumpable {
+ private final String known;
+ private final Thread thread;
+ private final StackTraceElement[] trace;
+
+ public MyDumpable(String known, Thread thread, StackTraceElement[] trace) {
+ this.known = known;
+ this.thread = thread;
+ this.trace = trace;
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ if (StringUtil.isBlank(known))
+ Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[]) trace);
+ else
+ Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
+ }
+
+ @Override
+ public String dump()
+ {
+ return null;
+ }
+ }
+
private class Runner implements Runnable
{
CountDownLatch latch = new CountDownLatch(1);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 51eba5f0..e43c08e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -137,13 +137,12 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
zkBasedCloudSolrClient = null;
}
}
- super.tearDown();
-
// clear the shared collection before next test run
final CloudHttp2SolrClient solrClient = cluster.getSolrClient();
if (CollectionAdminRequest.listCollections(solrClient).contains(COLLECTION)) {
solrClient.deleteByQuery(COLLECTION, "*:*");
}
+ super.tearDown();
}
private void createTestCollectionIfNeeded() throws IOException, SolrServerException {
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 925f138..cf89ea1 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -122,6 +122,8 @@ public class SolrTestCase extends LuceneTestCase {
protected volatile static ExecutorService testExecutor;
+ protected static volatile SolrQueuedThreadPool qtp;
+
@Rule
public TestRule solrTestRules =
RuleChain.outerRule(new SystemPropertiesRestoreRule()).around(new SolrTestWatcher());
@@ -434,13 +436,17 @@ public class SolrTestCase extends LuceneTestCase {
log.info("@After Class ------------------------------------------------------");
try {
- ParWork.closeExecutor(true);
-
- ParWork.shutdownExec();
-
+ SolrQueuedThreadPool fqtp = qtp;
+ if (fqtp != null) {
+ fqtp.close();
+ qtp = null;
+ }
SysStats.getSysStats().stopMonitor();
+ ParWork.closeMyPerThreadExecutor(true);
+ ParWork.shutdownRootSharedExec();
+
if (!failed && suiteFailureMarker.wasSuccessful() ) {
String object = null;
// if the tests passed, make sure everything was closed / released
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 59841c3..73cacf0 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -119,8 +119,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Path confDir;
- private static SolrQueuedThreadPool qtp;
-
@BeforeClass
public static void beforeFullSolrCloudTest() throws IOException {
qtp = getQtp();
@@ -135,12 +133,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
@Before
public void setup() throws IOException {
-
-
}
-
-
private static void copyConfigFileToTmpConf(Path confDir, String file) throws IOException {
Files.copy(Paths.get(SolrTestCaseJ4.TEST_HOME(), "collection1", "conf", file),
Paths.get(confDir.toString(), file), StandardCopyOption.REPLACE_EXISTING);
@@ -329,14 +323,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
@AfterClass
public static void afterClass() throws Exception {
- if (qtp != null) {
-
- qtp.close();
- qtp = null;
- }
-
- System.clearProperty("solrcloud.update.delay");
- System.clearProperty("genericCoreNodeNames");
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index df8b7eb..8c6e4be 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -90,7 +90,6 @@ public class SolrCloudTestCase extends SolrTestCase {
public static final int DEFAULT_TIMEOUT = 15; // this is SECONDS, not MILLIS
public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;
- private static volatile SolrQueuedThreadPool qtp;
private static class Config {
final String name;
@@ -303,10 +302,6 @@ public class SolrCloudTestCase extends SolrTestCase {
cluster = null;
}
}
- if (qtp != null) {
- qtp.stop();
- qtp = null;
- }
}
@Before
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index dc7393a..78bf83a 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -30,21 +30,17 @@ import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.JMException;
@@ -53,21 +49,13 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkMaintenanceUtils;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.common.util.TimeOut;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeperExposed;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.ManagedUtil;
import org.apache.zookeeper.server.NIOServerCnxn;
@@ -588,7 +576,7 @@ public class ZkTestServer implements Closeable {
log.error("zkServer error", t);
}
} finally {
- ParWork.closeExecutor();
+ ParWork.closeMyPerThreadExecutor();
}
}
};