You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/09 21:01:59 UTC

[lucene-solr] 17/23: start using per thread executor for httpshardhandler, cleanup some shutdown, parallel metrics reporter load

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 578e1b4d63352288a4ae45316af0f90f1cfec88f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jul 8 14:06:09 2020 -0500

    start using per thread executor for httpshardhandler, cleanup some shutdown, parallel metrics reporter load
---
 .../java/org/apache/solr/core/CoreContainer.java   | 19 +++--
 .../handler/component/HttpShardHandlerFactory.java | 82 +++++++++++-----------
 .../org/apache/solr/metrics/SolrMetricManager.java | 24 +++++--
 3 files changed, 72 insertions(+), 53 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f7ac939..16e8f78 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1016,10 +1016,14 @@ public class CoreContainer implements Closeable {
         zkController.disconnect();
       }
 
-      solrCores.closing();
+      if (solrCores != null) {
+        solrCores.closing();
+      }
 
-      // stop accepting new tasks
-      replayUpdatesExecutor.shutdown();
+      if (replayUpdatesExecutor != null) {
+        // stop accepting new tasks
+        replayUpdatesExecutor.shutdown();
+      }
 
       closer.add("workExecutor & replayUpdateExec", () -> {
         replayUpdatesExecutor.shutdownAndAwaitTermination();
@@ -1090,10 +1094,13 @@ public class CoreContainer implements Closeable {
         auditPlugin = auditloggerPlugin.plugin;
       }
 
-      closer.add("Final Items",  authPlugin, authenPlugin, auditPlugin,
-              loader, callables, shardHandlerFactory, updateShardHandler, solrClientCache);
+      closer.add("Final Items",  authPlugin, authenPlugin, auditPlugin, callables, solrClientCache);
+
+      closer.add("zkSys", zkSys);
+
+      closer.add("shardHandlers", shardHandlerFactory, updateShardHandler);
+      closer.add("loader", loader);
 
-      closer.add(zkSys);
 
     } finally {
       assert ObjectReleaseTracker.release(this);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index e9cf3fc..206632f 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -52,6 +52,7 @@ import org.apache.solr.client.solrj.routing.ReplicaListTransformerFactory;
 import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -61,6 +62,7 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.URLUtil;
 import org.apache.solr.core.PluginInfo;
@@ -92,29 +94,29 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
   // requests at some point (or should we simply return failure?)
   //
   // This executor is initialized in the init method
-  private ExecutorService commExecutor;
+//  private ExecutorService commExecutor;
 
   protected volatile Http2SolrClient defaultClient;
-  protected InstrumentedHttpListenerFactory httpListenerFactory;
-  private LBHttp2SolrClient loadbalancer;
+  protected volatile InstrumentedHttpListenerFactory httpListenerFactory;
+  private volatile LBHttp2SolrClient loadbalancer;
 
   int corePoolSize = 0;
   int maximumPoolSize = Integer.MAX_VALUE;
   int keepAliveTime = 5;
   int queueSize = -1;
-  int   permittedLoadBalancerRequestsMinimumAbsolute = 0;
-  float permittedLoadBalancerRequestsMaximumFraction = 1.0f;
-  boolean accessPolicy = false;
-  private WhitelistHostChecker whitelistHostChecker = null;
-  private SolrMetricsContext solrMetricsContext;
+  volatile int   permittedLoadBalancerRequestsMinimumAbsolute = 0;
+  volatile float permittedLoadBalancerRequestsMaximumFraction = 1.0f;
+  volatile boolean accessPolicy = false;
+  private volatile WhitelistHostChecker whitelistHostChecker = null;
+  private volatile SolrMetricsContext solrMetricsContext;
 
   private String scheme = null;
 
-  private InstrumentedHttpListenerFactory.NameStrategy metricNameStrategy;
+  private volatile InstrumentedHttpListenerFactory.NameStrategy metricNameStrategy;
 
   protected final Random r = new Random();
 
-  private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+  private volatile RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
 
   // URL scheme to be used in distributed search.
   static final String INIT_URL_SCHEME = "urlScheme";
@@ -146,6 +148,10 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
 
   static final String SET_SOLR_DISABLE_SHARDS_WHITELIST_CLUE = " set -D"+INIT_SOLR_DISABLE_SHARDS_WHITELIST+"=true to disable shards whitelist checks";
 
+  public HttpShardHandlerFactory() {
+    ObjectReleaseTracker.track(this);
+  }
+
   /**
    * Get {@link ShardHandler} that uses the default http client.
    */
@@ -296,16 +302,16 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
         new SynchronousQueue<Runnable>(this.accessPolicy) :
         new ArrayBlockingQueue<Runnable>(this.queueSize, this.accessPolicy);
 
-    this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
-        this.corePoolSize,
-        this.maximumPoolSize,
-        this.keepAliveTime, TimeUnit.SECONDS,
-        blockingQueue,
-        new SolrNamedThreadFactory("httpShardExecutor"),
-        // the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
-        // see SOLR-11880 for more details
-        false
-    );
+//    this.commExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
+//        this.corePoolSize,
+//        this.maximumPoolSize,
+//        this.keepAliveTime, TimeUnit.SECONDS,
+//        blockingQueue,
+//        new SolrNamedThreadFactory("httpShardExecutor"),
+//        // the Runnable added to this executor handles all exceptions so we disable stack trace collection as an optimization
+//        // see SOLR-11880 for more details
+//        false
+//    );
 
     this.httpListenerFactory = new InstrumentedHttpListenerFactory(this.metricNameStrategy);
     int connectionTimeout = getParameter(args, HttpClientUtil.PROP_CONNECTION_TIMEOUT,
@@ -347,24 +353,18 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
 
   @Override
   public void close() {
-    try {
-      ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
-    } finally {
-      try {
-        if (loadbalancer != null) {
-          loadbalancer.close();
-        }
-      } finally {
-        if (defaultClient != null) {
-          IOUtils.closeQuietly(defaultClient);
+    try (ParWork closer = new ParWork(this)) {
+      closer.add("", loadbalancer, defaultClient, () -> {
+        try {
+          SolrMetricProducer.super.close();
+        } catch (Exception e) {
+          log.warn("Exception closing.", e);
         }
-      }
-    }
-    try {
-      SolrMetricProducer.super.close();
-    } catch (Exception e) {
-      log.warn("Exception closing.", e);
+        return HttpShardHandlerFactory.this;
+      });
     }
+
+    ObjectReleaseTracker.release(this);
   }
 
   @Override
@@ -433,8 +433,8 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
    * Creates a new completion service for use by a single set of distributed requests.
    */
   public CompletionService<ShardResponse> newCompletionService() {
-    return new ExecutorCompletionService<>(commExecutor);
-  }
+    return new ExecutorCompletionService<>(ParWork.getExecutor());
+  } // ##Super expert usage
 
   /**
    * Rebuilds the URL replacing the URL scheme of the passed URL with the
@@ -456,9 +456,9 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
     solrMetricsContext = parentContext.getChildContext(this);
     String expandedScope = SolrMetricManager.mkName(scope, SolrInfoBean.Category.QUERY.name());
     httpListenerFactory.initializeMetrics(solrMetricsContext, expandedScope);
-    commExecutor = MetricUtils.instrumentedExecutorService(commExecutor, null,
-        solrMetricsContext.getMetricRegistry(),
-        SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
+//    commExecutor = MetricUtils.instrumentedExecutorService(commExecutor, null,
+//        solrMetricsContext.getMetricRegistry(),
+//        SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
   }
 
   /**
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index bb79009..59a591f 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -48,11 +48,13 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.SharedMetricRegistries;
 import com.codahale.metrics.Timer;
+import org.apache.solr.common.Callable;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.MetricsConfig;
 import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.core.SolrResourceLoader;
@@ -872,6 +874,7 @@ public class SolrMetricManager {
     if (pluginInfos == null || pluginInfos.length == 0) {
       return;
     }
+    List<Callable<SolrConfig.SolrPluginInfo>> calls = new ArrayList<>();
     String registryName = getRegistryName(group, registryNames);
     for (PluginInfo info : pluginInfos) {
       boolean enabled = true;
@@ -915,11 +918,20 @@ public class SolrMetricManager {
           continue;
         }
       }
-      try {
-        loadReporter(registryName, loader, coreContainer, solrCore, info, tag);
-      } catch (Exception e) {
-        log.warn("Error loading metrics reporter, plugin info: {}", info, e);
-      }
+
+      calls.add((p)->{
+        try {
+          loadReporter(registryName, loader, coreContainer, solrCore, info, tag);
+        } catch (Exception e) {
+          log.warn("Error loading metrics reporter, plugin info: {}", info, e);
+        }
+
+      });
+
+    }
+
+    try (ParWork worker = new ParWork(this)) {
+      worker.add("loadMetricsReporters", calls);
     }
   }
 
@@ -1122,7 +1134,7 @@ public class SolrMetricManager {
     } finally {
       reportersLock.unlock();
     }
-    try (ParWork closer = new ParWork(this)) {
+    try (ParWork closer = new ParWork(this, true)) {
       closer.add("MetricReporters", closeReporters);
     }
     return removed;