You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/09 21:01:59 UTC
[lucene-solr] 17/23: start using per thread executor for
httpshardhandler, cleanup some shutdown, parallel metrics reporter load
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 578e1b4d63352288a4ae45316af0f90f1cfec88f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jul 8 14:06:09 2020 -0500
start using per thread executor for httpshardhandler, cleanup some shutdown, parallel metrics reporter load
---
.../java/org/apache/solr/core/CoreContainer.java | 19 +++--
.../handler/component/HttpShardHandlerFactory.java | 82 +++++++++++-----------
.../org/apache/solr/metrics/SolrMetricManager.java | 24 +++++--
3 files changed, 72 insertions(+), 53 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f7ac939..16e8f78 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1016,10 +1016,14 @@ public class CoreContainer implements Closeable {
zkController.disconnect();
}
- solrCores.closing();
+ if (solrCores != null) {
+ solrCores.closing();
+ }
- // stop accepting new tasks
- replayUpdatesExecutor.shutdown();
+ if (replayUpdatesExecutor != null) {
+ // stop accepting new tasks
+ replayUpdatesExecutor.shutdown();
+ }
closer.add("workExecutor & replayUpdateExec", () -> {
replayUpdatesExecutor.shutdownAndAwaitTermination();
@@ -1090,10 +1094,13 @@ public class CoreContainer implements Closeable {
auditPlugin = auditloggerPlugin.plugin;
}
- closer.add("Final Items", authPlugin, authenPlugin, auditPlugin,
- loader, callables, shardHandlerFactory, updateShardHandler, solrClientCache);
+ closer.add("Final Items", authPlugin, authenPlugin, auditPlugin, callables, solrClientCache);
+
+ closer.add("zkSys", zkSys);
+
+ closer.add("shardHandlers", shardHandlerFactory, updateShardHandler);
+ closer.add("loader", loader);
- closer.add(zkSys);
} finally {
assert ObjectReleaseTracker.release(this);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index e9cf3fc..206632f 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -52,6 +52,7 @@ import org.apache.solr.client.solrj.routing.ReplicaListTransformerFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -61,6 +62,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.core.PluginInfo;
@@ -92,29 +94,29 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
// requests at some point (or should we simply return failure?)
//
// This executor is initialized in the init method
- private ExecutorService commExecutor;
+// private ExecutorService commExecutor;
protected volatile Http2SolrClient defaultClient;
- protected InstrumentedHttpListenerFactory httpListenerFactory;
- private LBHttp2SolrClient loadbalancer;
+ protected volatile InstrumentedHttpListenerFactory httpListenerFactory;
+ private volatile LBHttp2SolrClient loadbalancer;
int corePoolSize = 0;
int maximumPoolSize = Integer.MAX_VALUE;
int keepAliveTime = 5;
int queueSize = -1;
- int permittedLoadBalancerRequestsMinimumAbsolute = 0;
- float permittedLoadBalancerRequestsMaximumFraction = 1.0f;
- boolean accessPolicy = false;
- private WhitelistHostChecker whitelistHostChecker = null;
- private SolrMetricsContext solrMetricsContext;
+ volatile int permittedLoadBalancerRequestsMinimumAbsolute = 0;
+ volatile float permittedLoadBalancerRequestsMaximumFraction = 1.0f;
+ volatile boolean accessPolicy = false;
+ private volatile WhitelistHostChecker whitelistHostChecker = null;
+ private volatile SolrMetricsContext solrMetricsContext;
private String scheme = null;
- private InstrumentedHttpListenerFactory.NameStrategy metricNameStrategy;
+ private volatile InstrumentedHttpListenerFactory.NameStrategy metricNameStrategy;
protected final Random r = new Random();
- private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+ private volatile RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
// URL scheme to be used in distributed search.
static final String INIT_URL_SCHEME = "urlScheme";
@@ -146,6 +148,10 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
static final String SET_SOLR_DISABLE_SHARDS_WHITELIST_CLUE = " set -D"+INIT_SOLR_DISABLE_SHARDS_WHITELIST+"=true to disable shards whitelist checks";
+ public HttpShardHandlerFactory() {
+ ObjectReleaseTracker.track(this);
+ }
+
/**
* Get {@link ShardHandler} that uses the default http client.
*/
@@ -296,16 +302,16 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
new SynchronousQueue<Runnable>(this.accessPolicy) :
new ArrayBlockingQueue<Runnable>(this.queueSize, this.accessPolicy);
- this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
- this.corePoolSize,
- this.maximumPoolSize,
- this.keepAliveTime, TimeUnit.SECONDS,
- blockingQueue,
- new SolrNamedThreadFactory("httpShardExecutor"),
- // the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
- // see SOLR-11880 for more details
- false
- );
+// this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
+// this.corePoolSize,
+// this.maximumPoolSize,
+// this.keepAliveTime, TimeUnit.SECONDS,
+// blockingQueue,
+// new SolrNamedThreadFactory("httpShardExecutor"),
+// // the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
+// // see SOLR-11880 for more details
+// false
+// );
this.httpListenerFactory = new InstrumentedHttpListenerFactory(this.metricNameStrategy);
int connectionTimeout = getParameter(args, HttpClientUtil.PROP_CONNECTION_TIMEOUT,
@@ -347,24 +353,18 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
@Override
public void close() {
- try {
- ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
- } finally {
- try {
- if (loadbalancer != null) {
- loadbalancer.close();
- }
- } finally {
- if (defaultClient != null) {
- IOUtils.closeQuietly(defaultClient);
+ try (ParWork closer = new ParWork(this)) {
+ closer.add("", loadbalancer, defaultClient, () -> {
+ try {
+ SolrMetricProducer.super.close();
+ } catch (Exception e) {
+ log.warn("Exception closing.", e);
}
- }
- }
- try {
- SolrMetricProducer.super.close();
- } catch (Exception e) {
- log.warn("Exception closing.", e);
+ return HttpShardHandlerFactory.this;
+ });
}
+
+ ObjectReleaseTracker.release(this);
}
@Override
@@ -433,8 +433,8 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
* Creates a new completion service for use by a single set of distributed requests.
*/
public CompletionService<ShardResponse> newCompletionService() {
- return new ExecutorCompletionService<>(commExecutor);
- }
+ return new ExecutorCompletionService<>(ParWork.getExecutor());
+ } // ##Super expert usage
/**
* Rebuilds the URL replacing the URL scheme of the passed URL with the
@@ -456,9 +456,9 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
solrMetricsContext = parentContext.getChildContext(this);
String expandedScope = SolrMetricManager.mkName(scope, SolrInfoBean.Category.QUERY.name());
httpListenerFactory.initializeMetrics(solrMetricsContext, expandedScope);
- commExecutor = MetricUtils.instrumentedExecutorService(commExecutor, null,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
+// commExecutor = MetricUtils.instrumentedExecutorService(commExecutor, null,
+// solrMetricsContext.getMetricRegistry(),
+// SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
}
/**
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index bb79009..59a591f 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -48,11 +48,13 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
+import org.apache.solr.common.Callable;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.MetricsConfig;
import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.SolrResourceLoader;
@@ -872,6 +874,7 @@ public class SolrMetricManager {
if (pluginInfos == null || pluginInfos.length == 0) {
return;
}
+ List<Callable<SolrConfig.SolrPluginInfo>> calls = new ArrayList<>();
String registryName = getRegistryName(group, registryNames);
for (PluginInfo info : pluginInfos) {
boolean enabled = true;
@@ -915,11 +918,20 @@ public class SolrMetricManager {
continue;
}
}
- try {
- loadReporter(registryName, loader, coreContainer, solrCore, info, tag);
- } catch (Exception e) {
- log.warn("Error loading metrics reporter, plugin info: {}", info, e);
- }
+
+ calls.add((p)->{
+ try {
+ loadReporter(registryName, loader, coreContainer, solrCore, info, tag);
+ } catch (Exception e) {
+ log.warn("Error loading metrics reporter, plugin info: {}", info, e);
+ }
+
+ });
+
+ }
+
+ try (ParWork worker = new ParWork(this)) {
+ worker.add("loadMetricsReporters", calls);
}
}
@@ -1122,7 +1134,7 @@ public class SolrMetricManager {
} finally {
reportersLock.unlock();
}
- try (ParWork closer = new ParWork(this)) {
+ try (ParWork closer = new ParWork(this, true)) {
closer.add("MetricReporters", closeReporters);
}
return removed;