You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/10 01:43:48 UTC
[lucene-solr] 04/09: @1331 Work on cleaning up.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 8d79a83cfe1dd0c80bb8429538788e8e8a8f3587
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Feb 4 08:00:46 2021 -0600
@1331 Work on cleaning up.
---
.../apache/solr/cloud/OverseerTaskProcessor.java | 2 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 8 +-
.../java/org/apache/solr/core/CoreContainer.java | 4 +-
.../src/java/org/apache/solr/core/PluginBag.java | 4 +-
.../src/java/org/apache/solr/core/SolrCore.java | 8 +-
.../org/apache/solr/core/SolrResourceLoader.java | 4 +-
.../apache/solr/handler/ReplicationHandler.java | 2 +-
.../org/apache/solr/metrics/SolrMetricManager.java | 13 +-
.../java/org/apache/solr/request/SimpleFacets.java | 4 +-
.../apache/solr/security/AuditLoggerPlugin.java | 2 +-
.../solr/security/MultiDestinationAuditLogger.java | 10 ++
.../solr/security/SolrLogAuditLoggerPlugin.java | 10 ++
.../apache/solr/servlet/SolrLifcycleListener.java | 4 +-
.../apache/solr/servlet/SolrRequestParsers.java | 2 +-
.../apache/solr/servlet/SolrShutdownHandler.java | 4 +-
.../apache/solr/update/DirectUpdateHandler2.java | 2 +-
.../org/apache/solr/update/UpdateShardHandler.java | 2 +-
.../java/org/apache/solr/update/VersionInfo.java | 2 +-
.../processor/DistributedZkUpdateProcessor.java | 2 +-
.../solr/util/plugin/AbstractPluginLoader.java | 2 +-
.../solr/security/CallbackAuditLoggerPlugin.java | 10 ++
.../solr/security/MockAuditLoggerPlugin.java | 10 ++
.../org/apache/solr/util/OrderedExecutorTest.java | 16 +-
.../client/solrj/impl/CloudHttp2SolrClient.java | 2 +-
.../solr/client/solrj/io/SolrClientCache.java | 2 +-
.../java/org/apache/solr/cloud/ActionThrottle.java | 6 +-
.../src/java/org/apache/solr/common/ParWork.java | 165 ++++++++++++---------
.../org/apache/solr/common/ParWorkExecutor.java | 23 +--
.../apache/solr/common/PerThreadExecService.java | 57 +++----
.../org/apache/solr/common/cloud/SolrZkClient.java | 5 +-
.../apache/solr/common/cloud/ZkStateReader.java | 2 +-
.../apache/solr/common/util/OrderedExecutor.java | 17 +--
.../apache/solr/BaseDistributedSearchTestCase.java | 4 +-
.../solr/cloud/AbstractFullDistribZkTestBase.java | 4 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 4 +-
.../src/resources/logconf/log4j2-std-debug.xml | 4 +-
versions.props | 20 +--
38 files changed, 266 insertions(+), 178 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 06c4082..f5b00d2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -155,7 +155,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
AtomicBoolean sessionExpired = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean();
// TODO: async
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (Map.Entry<String, QueueEvent> entry : entrySet) {
work.collect("cleanWorkQueue", () -> {
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 6b89e65..476cd84 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -70,7 +70,7 @@ public class StatePublisher implements Closeable {
@Override
public void run() {
- ActionThrottle throttle = new ActionThrottle("StatePublisherWorker", 50);
+ ActionThrottle throttle = new ActionThrottle("StatePublisherWorker", 0);
while (!terminated && !zkStateReader.getZkClient().isClosed()) {
if (!zkStateReader.getZkClient().isConnected()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e752259..6e3b649 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -523,7 +523,7 @@ public class ZkController implements Closeable, Runnable {
// notify any other objects that need to know when the session was re-connected
// the OnReconnect operation can be expensive per listener, so do that async in the background
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
reconnectListeners.forEach(listener -> {
try {
work.collect(new OnReconnectNotifyAsync(listener));
@@ -554,7 +554,7 @@ public class ZkController implements Closeable, Runnable {
});
zkClient.setDisconnectListener(() -> {
- try (ParWork worker = new ParWork("disconnected", true, true)) {
+ try (ParWork worker = new ParWork("disconnected", true, false)) {
worker.collect(ZkController.this.overseer);
worker.collect(leaderElectors.values());
// I don't think so...
@@ -593,7 +593,7 @@ public class ZkController implements Closeable, Runnable {
ParWork.propagateInterrupt("Error Removing ephemeral live node. Continuing to close CoreContainer", e);
}
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect("replicateFromLeaders", replicateFromLeaders);
closer.collect(leaderElectors);
@@ -621,7 +621,7 @@ public class ZkController implements Closeable, Runnable {
this.isShutdownCalled = true;
this.isClosed = true;
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect(leaderElectors);
closer.collect(sysPropsCacher);
closer.collect(cloudManager);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8d3a844..e5e3b5f 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -754,7 +754,7 @@ public class CoreContainer implements Closeable {
}
}
- try (ParWork work = new ParWork(this, false, true)) {
+ try (ParWork work = new ParWork(this, false, false)) {
boolean enableMetrics = Boolean.getBoolean("solr.enableMetrics");
if (enableMetrics) {
@@ -1120,7 +1120,7 @@ public class CoreContainer implements Closeable {
}
}
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect("replayUpdateExec", () -> {
replayUpdatesExecutor.shutdownAndAwaitTermination();
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 1d6cd81..14db235 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -286,7 +286,7 @@ public class PluginBag<T> implements AutoCloseable {
*/
void init(Map<String, T> defaults, SolrCore solrCore, Collection<PluginInfo> infos) {
core = solrCore;
- try (ParWork parWork = new ParWork(this, false, true)) {
+ try (ParWork parWork = new ParWork(this, false, false)) {
for (PluginInfo info : infos) {
parWork.collect("", new CreateAndPutRequestHandler(info));
}
@@ -332,7 +332,7 @@ public class PluginBag<T> implements AutoCloseable {
*/
@Override
public void close() {
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
registry.forEach((s, tPluginHolder) -> {
worker.collect(tPluginHolder);
});
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 55ec855..2856347 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1839,7 +1839,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
searcherReadyLatch.countDown();
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
List<Callable<Object>> closeHookCalls = new ArrayList<>();
if (closeHooks != null) {
@@ -2747,7 +2747,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (currSearcher == null) {
future = searcherExecutor.submit(() -> {
- try (ParWork work = new ParWork(this, false, true)) {
+ try (ParWork work = new ParWork(this, false, false)) {
for (SolrEventListener listener : firstSearcherListeners) {
work.collect("fistSearcherListeners", () -> {
listener.newSearcher(newSearcher, null);
@@ -2760,7 +2760,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (currSearcher != null) {
future = searcherExecutor.submit(() -> {
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (SolrEventListener listener : newSearcherListeners) {
work.collect("newSearcherListeners", () -> {
listener.newSearcher(newSearcher, null);
@@ -3501,7 +3501,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
//some files in conf directory may have other than managedschema, overlay, params
- try (ParWork worker = new ParWork("ConfListeners", true, true)) {
+ try (ParWork worker = new ParWork("ConfListeners", true, false)) {
if (cc.isShutDown()) return;
core.confListeners.forEach(runnable -> {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java
index 039be98..9f4b7ad 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrResourceLoader.java
@@ -845,7 +845,7 @@ public class SolrResourceLoader implements ResourceLoader, Closeable {
*/
public void inform(SolrCore core) {
while (waitingForCore.size() > 0) {
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
waitingForCore.forEach(aware -> {
worker.collect("informSolrCore", ()-> {
try {
@@ -866,7 +866,7 @@ public class SolrResourceLoader implements ResourceLoader, Closeable {
*/
public void inform(ResourceLoader loader) {
while (waitingForResources.size() > 0) {
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
waitingForResources.forEach(r -> {
worker.collect("informResourceLoader", ()-> {
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index a0500dc..d8b9fdb 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -1360,7 +1360,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
}
}
- log.info("Commits will be reserved for {} ms", reserveCommitDuration);
+ if (log.isDebugEnabled()) log.debug("Commits will be reserved for {} ms", reserveCommitDuration);
}
// check master or slave is enabled
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index 51ca9a1..c545b8e 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -1001,9 +1001,10 @@ public class SolrMetricManager {
});
}
-
- try (ParWork worker = new ParWork(this)) {
- worker.collect(calls);
+ if (calls.size() > 0) {
+ try (ParWork worker = new ParWork(this, false, false)) {
+ worker.collect(calls);
+ }
}
}
@@ -1207,8 +1208,10 @@ public class SolrMetricManager {
} finally {
reportersLock.unlock();
}
- try (ParWork closer = new ParWork(this, true)) {
- closer.collect("MetricReporters", closeReporters);
+ if (closeReporters.size() > 0) {
+ try (ParWork closer = new ParWork(this, true, false)) {
+ closer.collect("MetricReporters", closeReporters);
+ }
}
return removed;
}
diff --git a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
index 9b39019..4884ca7 100644
--- a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
+++ b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
@@ -814,7 +814,7 @@ public class SimpleFacets {
// fdebugParent.putInfoItem("maxThreads", maxThreads);
// }
List<Callable<NamedList>> calls = new ArrayList<>(facetFs.length);
- try (ParWork worker = new ParWork(this)) {
+ try {
//Loop over fields; submit to executor, keeping the future
for (String f : facetFs) {
@@ -865,7 +865,7 @@ public class SimpleFacets {
throw (RuntimeException) e;
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while processing facet fields: " + e.toString(), ee);
+ "Error while processing facet fields: " + ee.toString(), ee);
}
return res;
diff --git a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
index b8b5659..181df44 100644
--- a/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/AuditLoggerPlugin.java
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
* @since 8.1.0
* @lucene.experimental
*/
-public abstract class AuditLoggerPlugin extends ParWork.NoLimitsCallable implements Closeable, SolrInfoBean {
+public abstract class AuditLoggerPlugin extends ParWork.ParWorkCallableBase implements Closeable, SolrInfoBean {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String PARAM_EVENT_TYPES = "eventTypes";
static final String PARAM_ASYNC = "async";
diff --git a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
index 2f59d04..d2c8b8f 100644
--- a/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
+++ b/solr/core/src/java/org/apache/solr/security/MultiDestinationAuditLogger.java
@@ -144,4 +144,14 @@ public class MultiDestinationAuditLogger extends AuditLoggerPlugin implements Re
public String getName() {
return "MultiDestinationAuditLogger";
}
+
+ @Override
+ public boolean isCallerThreadAllowed() {
+ return true;
+ }
+
+ @Override
+ public String getLabel() {
+ return "mAuditLogger";
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
index 2aec651..2e39237 100644
--- a/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/SolrLogAuditLoggerPlugin.java
@@ -83,4 +83,14 @@ public class SolrLogAuditLoggerPlugin extends AuditLoggerPlugin {
public String getName() {
return "SolrLogAuditLoggerPlugin";
}
+
+ @Override
+ public boolean isCallerThreadAllowed() {
+ return true;
+ }
+
+ @Override
+ public String getLabel() {
+ return "AuditLogger";
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrLifcycleListener.java b/solr/core/src/java/org/apache/solr/servlet/SolrLifcycleListener.java
index dee12a3..8f071b0 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrLifcycleListener.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrLifcycleListener.java
@@ -49,7 +49,7 @@ public class SolrLifcycleListener extends AbstractLifeCycle.AbstractLifeCycleLis
@Override
public void lifeCycleStopping(LifeCycle event) {
log.info("Solr is stopping, call ZkController#disconnect");
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (Runnable run : shutdowns) {
work.collect("shutdown", () -> run.run());
}
@@ -60,7 +60,7 @@ public class SolrLifcycleListener extends AbstractLifeCycle.AbstractLifeCycleLis
@Override
public void lifeCycleStopped(LifeCycle event) {
log.info("Solr is stopped, call shutdown");
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (Runnable run : stopped) {
work.collect("stopped", () -> run.run());
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrRequestParsers.java b/solr/core/src/java/org/apache/solr/servlet/SolrRequestParsers.java
index 679b17d..a0b81a1 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrRequestParsers.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrRequestParsers.java
@@ -605,7 +605,7 @@ public class SolrRequestParsers {
log.warn("Errors deleting multipart tmp files", e);
return;
}
- try (ParWork work = new ParWork("", true, false)) {
+ try (ParWork work = new ParWork("", true)) {
for (Part part : parts) {
work.collect("", () -> {
try {
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java b/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
index 0e27969..e1d4440 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
@@ -71,7 +71,7 @@ public class SolrShutdownHandler extends HandlerWrapper implements Graceful {
@Override
public synchronized Void get() throws InterruptedException, ExecutionException {
synchronized (SolrShutdownHandler.class) {
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (Runnable run : shutdowns) {
work.collect("shutdown", () -> run.run());
}
@@ -84,7 +84,7 @@ public class SolrShutdownHandler extends HandlerWrapper implements Graceful {
@Override
public synchronized Void get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
synchronized (SolrShutdownHandler.class) {
- try (ParWork work = new ParWork(this, true, true)) {
+ try (ParWork work = new ParWork(this, true, false)) {
for (Runnable run : shutdowns) {
work.collect("shutdown", () -> run.run());
}
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 11af6e5..c6a767a 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -828,7 +828,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) log.debug("closing {}", this);
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect(commitTracker);
closer.collect(softCommitTracker);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 0f7800e..45b7891 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -239,7 +239,7 @@ public class UpdateShardHandler implements SolrInfoBean {
if (searchOnlyClient != null) searchOnlyClient.disableCloseLock();
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect("", () -> {
HttpClientUtil.close(defaultClient);
return defaultClient;
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index edf3f55..2b9f2ec 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -292,7 +292,7 @@ public class VersionInfo {
vs.createWeight(funcContext, searcher);
List<LeafReaderContext> leaves = searcher.getTopReaderContext().leaves();
Set<Long> maxVersions = ConcurrentHashMap.newKeySet(leaves.size());
- try (ParWork work = new ParWork("maxVersion", false, true)) {
+ try (ParWork work = new ParWork("maxVersion", false, false)) {
for (LeafReaderContext ctx : leaves) {
work.collect("", () -> {
try {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 68ce680..9cde4ae 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -165,7 +165,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
public void processCommit(CommitUpdateCommand cmd) throws IOException {
if (log.isDebugEnabled()) log.debug("processCommit - start commit isLeader={} commit_end_point={} replicaType={}", isLeader, req.getParams().get(COMMIT_END_POINT), replicaType);
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
clusterState = zkController.getClusterState();
assert TestInjection.injectFailUpdateRequests();
diff --git a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
index 52396b8..760a5b6 100644
--- a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
+++ b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
@@ -150,7 +150,7 @@ public abstract class AbstractPluginLoader<T>
XPath xpath = loader.getXPath();
if (nodes !=null ) {
for (int i=0; i<nodes.size(); i++) {
- try (ParWork parWork = new ParWork(this, false, true)) {
+ try (ParWork parWork = new ParWork(this, false, false)) {
NodeInfo node = nodes.get(i);
String name = null;
diff --git a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
index 8f17714..941ef40 100644
--- a/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/CallbackAuditLoggerPlugin.java
@@ -94,4 +94,14 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
public String getName() {
return "CallbackAuditLoggerPlugin";
}
+
+ @Override
+ public boolean isCallerThreadAllowed() {
+ return true;
+ }
+
+ @Override
+ public String getLabel() {
+ return getName();
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
index b415fe3..aec30eb 100644
--- a/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/MockAuditLoggerPlugin.java
@@ -59,4 +59,14 @@ public class MockAuditLoggerPlugin extends AuditLoggerPlugin {
public String getName() {
return "MockAuditLoggerPlugin";
}
+
+ @Override
+ public boolean isCallerThreadAllowed() {
+ return true;
+ }
+
+ @Override
+ public String getLabel() {
+ return getName();
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 4fc7e6f..8b9c945 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -85,7 +85,7 @@ public class OrderedExecutorTest extends SolrTestCase {
});
// BBB doesn't care about the latch, but because it uses the same lockId, it's blocked on AAA
// so we execute it in a background thread...
- Future<?> future = getTestExecutor().submit(new MyNoLimitsCallable(orderedExecutor, lockId, events));
+ Future<?> future = getTestExecutor().submit(new MyParWorkCallableBase(orderedExecutor, lockId, events));
// now if we release the latchAAA, AAA should be garunteed to fire first, then BBB
latchAAA.countDown();
try {
@@ -232,12 +232,12 @@ public class OrderedExecutorTest extends SolrTestCase {
final AtomicInteger value = new AtomicInteger();
}
- private static class MyNoLimitsCallable extends ParWork.NoLimitsCallable {
+ private static class MyParWorkCallableBase extends ParWork.ParWorkCallableBase {
private final OrderedExecutor orderedExecutor;
private final Integer lockId;
private final BlockingQueue<String> events;
- public MyNoLimitsCallable(OrderedExecutor orderedExecutor, Integer lockId, BlockingQueue<String> events) {
+ public MyParWorkCallableBase(OrderedExecutor orderedExecutor, Integer lockId, BlockingQueue<String> events) {
this.orderedExecutor = orderedExecutor;
this.lockId = lockId;
this.events = events;
@@ -249,5 +249,15 @@ public class OrderedExecutorTest extends SolrTestCase {
});
return null;
}
+
+ @Override
+ public boolean isCallerThreadAllowed() {
+ return false;
+ }
+
+ @Override
+ public String getLabel() {
+ return "orderedExecutor";
+ }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
index 359a4d8..ba89b84 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -170,7 +170,7 @@ public class CloudHttp2SolrClient extends BaseCloudSolrClient {
@Override
public void close() throws IOException {
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect(stateProvider);
closer.collect(lbClient);
if (clientIsInternal && myClient!=null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 5762f3e..8d0ad42 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -109,7 +109,7 @@ public class SolrClientCache implements Serializable, Closeable {
public void close() {
synchronized (this) {
- try (ParWork closer = new ParWork(this, false, true)) {
+ try (ParWork closer = new ParWork(this, false, false)) {
for (Map.Entry<String,SolrClient> entry : solrClients.entrySet()) {
closer.collect("solrClient", entry.getValue());
}
diff --git a/solr/solrj/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/solrj/src/java/org/apache/solr/cloud/ActionThrottle.java
index f2d5ead..eca5bef 100644
--- a/solr/solrj/src/java/org/apache/solr/cloud/ActionThrottle.java
+++ b/solr/solrj/src/java/org/apache/solr/cloud/ActionThrottle.java
@@ -64,13 +64,13 @@ public class ActionThrottle {
}
public void minimumWaitBetweenActions() {
- if (lastActionStartedAt == null) {
+ if (lastActionStartedAt == null || minMsBetweenActions == 0) {
return;
}
long diff = timeSource.getTimeNs() - lastActionStartedAt;
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
- log.debug("The last {} attempt started {}ms ago.", name, diffMs);
+ if (log.isDebugEnabled()) log.debug("The last {} attempt started {}ms ago.", name, diffMs);
int sleep = 0;
if (diffMs > 0 && diff < minNsBetweenActions) {
@@ -80,7 +80,7 @@ public class ActionThrottle {
}
if (sleep > 0) {
- log.info("Throttling {} attempts - waiting for {}ms", name, sleep);
+ if (log.isDebugEnabled()) log.debug("Throttling {} attempts - waiting for {}ms", name, sleep);
try {
timeSource.sleep(sleep);
} catch (InterruptedException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index acc1754..b3be9cb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
*/
public class ParWork implements Closeable {
public static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
- public static final String ROOT_EXEC_NAME = "Root";
+ public static final String ROOT_EXEC_NAME = "SOLR";
private static final String WORK_WAS_INTERRUPTED = "Work was interrupted!";
private static final String RAN_INTO_AN_ERROR_WHILE_DOING_WORK =
@@ -67,7 +67,7 @@ public class ParWork implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
- private final boolean requireAnotherThread;
+ private final boolean callerThreadAllowed;
private final String rootLabel;
private final Queue<ParObject> collectSet = new LinkedList<>();
@@ -215,12 +215,12 @@ public class ParWork implements Closeable {
public ParWork(Object object, boolean ignoreExceptions) {
- this(object, ignoreExceptions, false);
+ this(object, ignoreExceptions, true);
}
- public ParWork(Object object, boolean ignoreExceptions, boolean requireAnotherThread) {
+ public ParWork(Object object, boolean ignoreExceptions, boolean callerThreadAllowed) {
this.ignoreExceptions = ignoreExceptions;
- this.requireAnotherThread = requireAnotherThread;
+ this.callerThreadAllowed = callerThreadAllowed;
this.rootLabel = object instanceof String ?
(String) object : object.getClass().getSimpleName();
assert (tracker = new TimeTracker(object, object == null ? "NullObject" : object.getClass().getName())) != null;
@@ -262,7 +262,7 @@ public class ParWork implements Closeable {
public void addCollect() {
if (collectSet.isEmpty()) {
- if (log.isDebugEnabled()) log.debug("No work collected to submit");
+ //if (log.isDebugEnabled()) log.debug("No work collected to submit", new RuntimeException());
return;
}
try {
@@ -306,10 +306,6 @@ public class ParWork implements Closeable {
}
private void add(Collection<ParObject> objects) {
- if (log.isDebugEnabled()) {
- log.debug("add(String objects={}, objects");
- }
-
Set<ParObject> wuObjects = new HashSet<>(objects);
WorkUnit workUnit = new WorkUnit(wuObjects, tracker);
@@ -334,9 +330,6 @@ public class ParWork implements Closeable {
@Override
public void close() {
- if (log.isDebugEnabled()) {
- log.debug("close() - start");
- }
addCollect();
@@ -355,7 +348,7 @@ public class ParWork implements Closeable {
AtomicReference<Throwable> exception = new AtomicReference<>();
try {
for (WorkUnit workUnit : workUnits) {
- if (log.isDebugEnabled()) log.debug("Process workunit {} {}", rootLabel, workUnit.objects);
+ if (log.isTraceEnabled()) log.trace("Process workunit {} {}", rootLabel, workUnit.objects);
TimeTracker workUnitTracker = null;
assert (workUnitTracker = workUnit.tracker.startSubClose(workUnit)) != null;
try {
@@ -369,16 +362,11 @@ public class ParWork implements Closeable {
for (ParObject object : objects) {
- if (object == null)
- continue;
+ if (object == null) continue;
TimeTracker finalWorkUnitTracker = workUnitTracker;
- if (requireAnotherThread) {
- closeCalls.add(new ParWorkNoLimitsCallable(exception, finalWorkUnitTracker, object));
- } else {
- closeCalls.add(new ParWorkCallable(exception, finalWorkUnitTracker, object));
- }
+ closeCalls.add(new ParWorkCallable(exception, finalWorkUnitTracker, object, callerThreadAllowed));
}
if (closeCalls.size() > 0) {
@@ -449,10 +437,6 @@ public class ParWork implements Closeable {
throw new RuntimeException(exp);
}
}
-
- if (log.isDebugEnabled()) {
- log.debug("close() - end");
- }
}
public static ExecutorService getMyPerThreadExecutor() {
@@ -466,8 +450,8 @@ public class ParWork implements Closeable {
if (service == null) {
ExecutorService exec = THREAD_LOCAL_EXECUTOR.get();
if (exec == null) {
- if (log.isDebugEnabled()) {
- log.debug("Starting a new executor");
+ if (log.isTraceEnabled()) {
+ log.trace("Starting a new executor");
}
Integer minThreads;
@@ -487,7 +471,7 @@ public class ParWork implements Closeable {
public static ExecutorService getParExecutorService(String name, int corePoolSize, int maxPoolSize, int keepAliveTime, BlockingQueue queue) {
ThreadPoolExecutor exec;
- exec = new ParWorkExecutor(name + "-" + Thread.currentThread().getName(),
+ exec = new ParWorkExecutor(name,
corePoolSize, maxPoolSize, keepAliveTime, queue);
return exec;
}
@@ -674,50 +658,85 @@ public class ParWork implements Closeable {
}
}
- public static abstract class NoLimitsCallable<V> implements Callable<Object> {
+ public static abstract class ParWorkCallableBase<V> implements Callable<Object> {
@Override
public abstract Object call() throws Exception;
+
+ public abstract boolean isCallerThreadAllowed();
+
+
+ public abstract String getLabel();
}
- public static class SolrFutureTask extends FutureTask implements SolrThread.CreateThread {
+ public static class SolrFutureTask extends FutureTask {
private final boolean callerThreadAllowed;
- private final SolrThread createThread;
+ //private final SolrThread createThread;
+ private final boolean callerThreadUsesAvailableLimit;
+ private String label;
+
+ public SolrFutureTask(String label, Callable callable) {
+ super(callable);
+ label = label;
+ callerThreadAllowed = true;
+ callerThreadUsesAvailableLimit = false;
+ }
+
+ public SolrFutureTask(Callable callable) {
+ this(callable, true);
+ }
public SolrFutureTask(Callable callable, boolean callerThreadAllowed) {
super(callable);
- this.callerThreadAllowed = callerThreadAllowed;
- Thread thread = Thread.currentThread();
- if (thread instanceof SolrThread) {
- this.createThread = (SolrThread) Thread.currentThread();
- } else {
- this.createThread = null;
+ if (callable instanceof ParWork.ParWorkCallableBase) {
+ this.label = ((ParWorkCallableBase<?>) callable).getLabel();
}
+ this.callerThreadAllowed = callerThreadAllowed;
+ this.callerThreadUsesAvailableLimit = false;
+// Thread thread = Thread.currentThread();
+// if (thread instanceof SolrThread) {
+// this.createThread = (SolrThread) Thread.currentThread();
+// } else {
+// this.createThread = null;
+// }
+ }
+
+ public String getLabel() {
+ return label;
}
public SolrFutureTask(Runnable runnable, Object value) {
- this(runnable, value, true);
+ this(runnable, value, true, false);
}
public SolrFutureTask(Runnable runnable, Object value, boolean callerThreadAllowed) {
+ this(runnable, value, callerThreadAllowed, false);
+ }
+
+ public SolrFutureTask(Runnable runnable, Object value, boolean callerThreadAllowed, boolean callerThreadUsesAvailableLimit) {
super(runnable, value);
this.callerThreadAllowed = callerThreadAllowed;
- Thread thread = Thread.currentThread();
- if (thread instanceof SolrThread) {
- this.createThread = (SolrThread) Thread.currentThread();
- } else {
- this.createThread = null;
- }
+ this.callerThreadUsesAvailableLimit = callerThreadUsesAvailableLimit;
+// Thread thread = Thread.currentThread();
+// if (thread instanceof SolrThread) {
+// this.createThread = (SolrThread) Thread.currentThread();
+// } else {
+// this.createThread = null;
+// }
}
public boolean isCallerThreadAllowed() {
return callerThreadAllowed;
}
- @Override
- public SolrThread getCreateThread() {
- return createThread;
+ public boolean callerThreadUsesAvailableLimit() {
+ return callerThreadUsesAvailableLimit;
}
+
+// @Override
+// public SolrThread getCreateThread() {
+// return createThread;
+// }
}
private static class ParObject {
@@ -725,42 +744,50 @@ public class ParWork implements Closeable {
Object object;
}
- private class ParWorkNoLimitsCallable extends NoLimitsCallable<Object> {
+ public class ParWorkCallable extends ParWorkCallableBase<Object> {
private final AtomicReference<Throwable> exception;
private final TimeTracker finalWorkUnitTracker;
private final ParObject object;
- public ParWorkNoLimitsCallable(AtomicReference<Throwable> exception, TimeTracker finalWorkUnitTracker, ParObject object) {
- this.exception = exception;
- this.finalWorkUnitTracker = finalWorkUnitTracker;
- this.object = object;
+ private final boolean callerThreadAllowed;
+ private final boolean callerThreadUsesAvailableLimit;
+
+ public ParWorkCallable() {
+ this.callerThreadAllowed = false;
+ this.exception = null;
+ this.finalWorkUnitTracker = null;
+ this.object = null;
+ callerThreadUsesAvailableLimit = true;
}
- @Override
- public Object call() {
- try {
- handleObject(exception, finalWorkUnitTracker, object);
- } catch (Throwable t) {
- log.error(RAN_INTO_AN_ERROR_WHILE_DOING_WORK, t);
- if (exception.get() == null) {
- exception.set(t);
- }
- }
- return object;
+ public ParWorkCallable(boolean callerThreadAllowed, boolean callerThreadUsesAvailableLimit) {
+ this(null, null, null, callerThreadAllowed, callerThreadUsesAvailableLimit);
}
- }
- private class ParWorkCallable implements Callable<Object> {
- private final AtomicReference<Throwable> exception;
- private final TimeTracker finalWorkUnitTracker;
- private final ParObject object;
+ public ParWorkCallable(AtomicReference<Throwable> exception, TimeTracker finalWorkUnitTracker, ParObject object, boolean callerThreadAllowed) {
+ this(exception, finalWorkUnitTracker, object, callerThreadAllowed, false);
+ }
- public ParWorkCallable(AtomicReference<Throwable> exception, TimeTracker finalWorkUnitTracker, ParObject object) {
+ public ParWorkCallable(AtomicReference<Throwable> exception, TimeTracker finalWorkUnitTracker, ParObject object, boolean callerThreadAllowed, boolean callerThreadUsesAvailableLimit) {
+ this.callerThreadAllowed = callerThreadAllowed;
+ this.callerThreadUsesAvailableLimit = callerThreadUsesAvailableLimit;
this.exception = exception;
this.finalWorkUnitTracker = finalWorkUnitTracker;
this.object = object;
}
+ public String getLabel() {
+ return object.label;
+ }
+
+ public boolean isCallerThreadAllowed() {
+ return callerThreadAllowed;
+ }
+
+ public boolean callerThreadUsesAvailableLimit() {
+ return callerThreadUsesAvailableLimit;
+ }
+
@Override
public Object call() {
try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 48581bc..caa7a77 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -23,19 +23,20 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
public class ParWorkExecutor extends ThreadPoolExecutor {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public static final int KEEP_ALIVE_TIME = 3000;
+ public static final int KEEP_ALIVE_TIME = 1000;
- private static AtomicInteger threadNumber = new AtomicInteger(0);
+ private static LongAdder threadNumber = new LongAdder();
private CloseTracker closeTracker;
@@ -43,10 +44,6 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
this(name, 4, maxPoolsSize, KEEP_ALIVE_TIME, new LinkedBlockingDeque<>());
}
- public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize) {
- this(name, corePoolsSize, maxPoolsSize, KEEP_ALIVE_TIME, new LinkedBlockingDeque<>());
- }
-
public ParWorkExecutor(String name, int corePoolsSize, int maxPoolsSize,
int keepalive, BlockingQueue<Runnable> workQueue) {
super(corePoolsSize, Math.max(corePoolsSize, maxPoolsSize), keepalive, TimeUnit.MILLISECONDS, workQueue
@@ -91,6 +88,14 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
}
}
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (r instanceof ParWork.SolrFutureTask) {
+ String label = ((ParWork.SolrFutureTask) r).getLabel();
+ t.setName(label);
+ }
+ }
+
private static class ParWorkThreadFactory implements ThreadFactory {
private final String name;
@@ -106,9 +111,9 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
-
+ threadNumber.increment();
SolrThread t = new SolrThread(group, null,
- name + threadNumber.getAndIncrement()) {
+ name + "-" + threadNumber.longValue()) {
public void run() {
r.run();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index 36a3270..83eaa8c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -29,8 +29,8 @@ public class PerThreadExecService extends AbstractExecutorService {
private final ExecutorService service;
private final int maxSize;
- private final boolean noCallerRunsAllowed;
- private final boolean noCallerRunsAvailableLimit;
+ private final boolean callerThreadAllowed;
+ private final boolean callerThreadUsesAvailableLimit;
private volatile boolean terminated;
private volatile boolean shutdown;
@@ -49,11 +49,11 @@ public class PerThreadExecService extends AbstractExecutorService {
this(service, maxSize, false, false);
}
- public PerThreadExecService(ExecutorService service, int maxSize, boolean noCallerRunsAllowed, boolean noCallerRunsAvailableLimit) {
+ public PerThreadExecService(ExecutorService service, int maxSize, boolean callerThreadAllowed, boolean callerThreadUsesAvailableLimit) {
assert service != null;
assert (closeTracker = new CloseTracker()) != null;
- this.noCallerRunsAllowed = noCallerRunsAllowed;
- this.noCallerRunsAvailableLimit = noCallerRunsAvailableLimit;
+ this.callerThreadAllowed = callerThreadAllowed;
+ this.callerThreadUsesAvailableLimit = callerThreadUsesAvailableLimit;
if (maxSize == -1) {
this.maxSize = MAX_AVAILABLE;
} else {
@@ -65,18 +65,16 @@ public class PerThreadExecService extends AbstractExecutorService {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- if (noCallerRunsAllowed) {
- return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value, false);
- }
- return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
+ return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value, callerThreadAllowed);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- if (noCallerRunsAllowed || callable instanceof ParWork.NoLimitsCallable) {
- return (RunnableFuture) new ParWork.SolrFutureTask(callable, false);
+ if (callable instanceof ParWork.ParWorkCallableBase) {
+ return (RunnableFuture) new ParWork.SolrFutureTask(callable, ((ParWork.ParWorkCallableBase) callable).isCallerThreadAllowed());
+ } else {
+ return (RunnableFuture) new ParWork.SolrFutureTask(callable, true);
}
- return (RunnableFuture) new ParWork.SolrFutureTask(callable, true);
}
@Override
@@ -126,7 +124,7 @@ public class PerThreadExecService extends AbstractExecutorService {
}
// System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
- running.wait(1000);
+ running.wait(500);
}
}
if (isShutdown()) {
@@ -139,14 +137,9 @@ public class PerThreadExecService extends AbstractExecutorService {
@Override
public void execute(Runnable runnable) {
-
-// if (shutdown) {
-// throw new RejectedExecutionException();
-// }
-
running.incrementAndGet();
if (runnable instanceof ParWork.SolrFutureTask && !((ParWork.SolrFutureTask) runnable).isCallerThreadAllowed()) {
- if (noCallerRunsAvailableLimit) {
+ if (callerThreadUsesAvailableLimit) {
try {
available.acquire();
} catch (InterruptedException e) {
@@ -159,10 +152,10 @@ public class PerThreadExecService extends AbstractExecutorService {
}
}
try {
- service.submit(new MyCallable(runnable, noCallerRunsAvailableLimit));
+ service.submit(new MyThreadCallable(runnable, available, running, callerThreadUsesAvailableLimit));
} catch (Exception e) {
log.error("", e);
- if (noCallerRunsAvailableLimit) {
+ if (callerThreadUsesAvailableLimit) {
available.release();
}
running.decrementAndGet();
@@ -175,14 +168,14 @@ public class PerThreadExecService extends AbstractExecutorService {
}
boolean acquired = available.tryAcquire();
- if (!acquired && !noCallerRunsAllowed) {
- runIt(runnable, false);
+ if (!acquired && callerThreadAllowed) {
+ runIt(runnable, available, running, false);
return;
}
Runnable finalRunnable = runnable;
try {
- service.submit(new MyCallable(finalRunnable, true));
+ service.submit(new MyThreadCallable(finalRunnable, available, running,true));
} catch (Exception e) {
log.error("Exception submitting", e);
try {
@@ -197,7 +190,7 @@ public class PerThreadExecService extends AbstractExecutorService {
}
}
- private void runIt(Runnable runnable, boolean acquired) {
+ private static void runIt(Runnable runnable, Semaphore available, AtomicInteger running, boolean acquired) {
try {
runnable.run();
} finally {
@@ -240,17 +233,25 @@ public class PerThreadExecService extends AbstractExecutorService {
}
}
- private class MyCallable implements Callable<Object> {
+ public static class MyThreadCallable implements Callable<Object> {
private final Runnable runnable;
private final boolean acquired;
+ private final Semaphore available;
+ private final AtomicInteger running;
- public MyCallable(Runnable runnable, boolean acquired) {
+ public MyThreadCallable(Runnable runnable, Semaphore available, AtomicInteger running, boolean acquired) {
this.runnable = runnable;
this.acquired = acquired;
+ this.available = available;
+ this.running = running;
+ }
+
+ public Runnable getRunnable() {
+ return runnable;
}
public Object call() {
- runIt(runnable, acquired);
+ runIt(runnable, available, running, acquired);
return null;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index cc91632..c761e3d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -1263,7 +1263,10 @@ public class SolrZkClient implements Closeable {
solrZkClient.zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
} else {
if (event.getType() != Event.EventType.None) {
- solrZkClient.zkCallbackExecutor.submit(() -> watcher.process(event));
+ solrZkClient.zkCallbackExecutor.submit(new ParWork.SolrFutureTask("ZkSolrEventThread", () -> {
+ watcher.process(event);
+ return null;
+ }));
}
}
} catch (RejectedExecutionException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7b63f14..4ba3a57 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1612,7 +1612,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void close() throws IOException {
SolrZooKeeper zk = zkClient.getSolrZooKeeper();
if (zk != null && zkClient.isAlive()) {
- try (ParWork work = new ParWork(this, false, true)) {
+ try (ParWork work = new ParWork(this, false, false)) {
work.collect("", () -> {
try {
zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
index e52580a..9a34db7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/OrderedExecutor.java
@@ -61,16 +61,15 @@ public class OrderedExecutor extends ExecutorCompletionService {
}
try {
- return delegate.submit(new ParWork.NoLimitsCallable(){
- public Object call() {
- try {
- command.run();
- } finally {
- sparseStripedLock.remove(lockId);
- }
- return null;
+ return delegate.submit(new ParWork.SolrFutureTask("TLogOrderedExec", () -> {
+ try {
+ command.run();
+ } finally {
+ sparseStripedLock.remove(lockId);
}
- });
+
+ return null;
+ }));
} catch (Exception e) {
sparseStripedLock.remove(lockId);
throw e;
diff --git a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
index 0e1b165..bbf7a04 100644
--- a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
@@ -156,7 +156,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
@After
public void cleanup() throws Exception {
- try (ParWork closer = new ParWork(this, true, true)) {
+ try (ParWork closer = new ParWork(this, true, false)) {
closer.collect(controlClient, clients, jettys, controlJetty);
}
@@ -355,7 +355,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
}
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
worker.collect("createControlJetty", () -> {
try {
controlJetty = createControlJetty();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 7a91e27..98408ce 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -450,7 +450,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
AtomicInteger addReplicas = new AtomicInteger();
- try (ParWork create = new ParWork(this, false, true)) {
+ try (ParWork create = new ParWork(this, false, false)) {
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
@@ -1698,7 +1698,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
@Override
protected void destroyServers() throws Exception {
- try (ParWork closer = new ParWork(this, false, true)) {
+ try (ParWork closer = new ParWork(this, false, false)) {
closer.collect(commonCloudSolrClient, coreClients, controlClientCloud, cloudClient);
}
coreClients.clear();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index a2cfcd1..69fec43 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -320,7 +320,7 @@ public class MiniSolrCloudCluster {
}
try {
- try (ParWork worker = new ParWork(this, false, true)) {
+ try (ParWork worker = new ParWork(this, false, false)) {
worker.collect("start-jettys", startups);
}
} catch (Exception e) {
@@ -665,7 +665,7 @@ public class MiniSolrCloudCluster {
}
jettys.clear();
- try (ParWork parWork = new ParWork(this, false, true)) {
+ try (ParWork parWork = new ParWork(this, false, false)) {
parWork.collect(shutdowns);
}
diff --git a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
index a2691dd..3e237f5 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
@@ -20,11 +20,11 @@
<Appenders>
<Console name="STDERR_COLOR" target="SYSTEM_ERR">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%maxLen{%t}{8})}{yellow,bold} [%style{%X{node_name} %X{core}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-1p}{20}} %style{(%maxLen{%t}{20})}{yellow,bold} [%style{%X{node_name} %X{core}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</Console>
<File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{core}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-1p}{20}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{core}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</File>
</Appenders>
diff --git a/versions.props b/versions.props
index 72c540b..2a013cb 100644
--- a/versions.props
+++ b/versions.props
@@ -5,10 +5,10 @@ com.cybozu.labs:langdetect=1.1-20120112
com.drewnoakes:metadata-extractor=2.11.0
com.epam:parso=2.0.11
com.fasterxml:aalto-xml=1.2.2
-com.fasterxml.jackson*:*=2.10.1
-com.fasterxml.staxmate:staxmate=2.3.1
-com.fasterxml.woodstox:woodstox-core:6.0.3
-com.github.ben-manes.caffeine:caffeine=2.8.4
+com.fasterxml.jackson*:*=2.12.1
+com.fasterxml.staxmate:staxmate=2.4.0
+com.fasterxml.woodstox:woodstox-core:6.2.3
+com.github.ben-manes.caffeine:caffeine=2.8.8
com.github.virtuald:curvesapi=1.06
com.github.zafarkhaja:java-semver=0.9.0
com.google.errorprone:*=2.4.0
@@ -70,7 +70,7 @@ org.apache.httpcomponents:httpcore=4.4.13
org.apache.httpcomponents:httpmime=4.5.12
org.apache.james:apache-mime4j*=0.8.3
org.apache.kerby:*=1.0.1
-org.apache.logging.log4j:*=2.13.2
+org.apache.logging.log4j:*=2.14.0
org.apache.opennlp:opennlp-tools=1.9.1
org.apache.pdfbox:*=2.0.17
org.apache.pdfbox:jempbox=1.8.16
@@ -88,8 +88,8 @@ org.carrot2:carrot2-mini=3.16.2
org.carrot2:morfologik-*=2.1.5
org.ccil.cowan.tagsoup:tagsoup=1.2.1
org.codehaus.janino:*=3.0.9
-org.eclipse.jetty.http2:*=9.4.32.v20200930
-org.eclipse.jetty:*=9.4.32.v20200930
+org.eclipse.jetty.http2:*=9.4.36.v20210114
+org.eclipse.jetty:*=9.4.36.v20210114
org.gagravarr:*=0.8
org.hamcrest:*=2.2
org.hsqldb:hsqldb=2.4.0
@@ -101,12 +101,12 @@ org.mockito:mockito-core=2.23.4
org.objenesis:objenesis=2.6
org.ow2.asm:*=7.2
org.rrd4j:rrd4j=3.5
-org.slf4j:*=1.7.24
+org.slf4j:*=1.7.30
org.tallison:jmatio=1.5
org.tukaani:xz=1.8
ua.net.nlp:morfologik-ukrainian-search=4.9.1
-xerces:xercesImpl=2.12.0
-net.sf.saxon:Saxon-HE=10.2
+xerces:xercesImpl=2.12.1
+net.sf.saxon:Saxon-HE=10.3
org.awaitility:awaitility:4.0.3
org.xerial.snappy:snappy-java=1.1.8.1
org.lz4:lz4-java:1.7.1