You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 10:03:13 UTC

[lucene] 02/10: SOLR-13350: Adding collectors executors to CoreContainer

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch jira/solr-13350-new
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit 3477c1fef477d2aa283f7789983c565add60a29f
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Tue Mar 3 11:39:41 2020 +0530

    SOLR-13350: Adding collectors executors to CoreContainer
---
 .../java/org/apache/solr/core/CoreContainer.java   | 10 +++++++
 .../src/java/org/apache/solr/core/NodeConfig.java  | 18 +++++++++++--
 .../org/apache/solr/search/SolrIndexSearcher.java  |  2 +-
 .../test/org/apache/solr/search/TestFiltering.java | 31 +++++++++++++---------
 4 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 901ff78..fc59fc7 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -184,6 +184,8 @@ public class CoreContainer {
   private volatile ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
       new DefaultSolrThreadFactory("coreContainerWorkExecutor"));
 
+  final private ExecutorService collectorExecutor;
+
   private final OrderedExecutor replayUpdatesExecutor;
 
   protected volatile LogWatcher logging = null;
@@ -353,6 +355,8 @@ public class CoreContainer {
         ExecutorUtil.newMDCAwareCachedThreadPool(
             cfg.getReplayUpdatesThreads(),
             new DefaultSolrThreadFactory("replayUpdatesExecutor")));
+    this.collectorExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(cfg.getCollectorsPoolSize(),
+        new DefaultSolrThreadFactory("searcherCollectorExecutor"));
   }
 
   private synchronized void initializeAuthorizationPlugin(Map<String, Object> authorizationConf) {
@@ -543,6 +547,7 @@ public class CoreContainer {
     cfg = null;
     containerProperties = null;
     replayUpdatesExecutor = null;
+    this.collectorExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("searcherCollectorExecutor"));
   }
 
   public static CoreContainer createAndLoad(Path solrHome) {
@@ -960,6 +965,7 @@ public class CoreContainer {
     }
     log.info("Shutting down CoreContainer instance=" + System.identityHashCode(this));
 
+    ExecutorUtil.shutdownAndAwaitTermination(collectorExecutor);
     ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
     ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
 
@@ -2055,6 +2061,10 @@ public class CoreContainer {
   public void runAsync(Runnable r) {
     coreContainerAsyncTaskExecutor.submit(r);
   }
+  
+  public ExecutorService getCollectorExecutor() {
+    return collectorExecutor;
+  }
 }
 
 class CloserThread extends Thread {
diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
index e03cdb1..a701beb 100644
--- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
@@ -64,6 +64,8 @@ public class NodeConfig {
 
   private final int replayUpdatesThreads;
 
+  private final int collectorsPoolSize;
+
   @Deprecated
   // This should be part of the transientCacheConfig, remove in 7.0
   private final int transientCacheSize;
@@ -86,7 +88,7 @@ public class NodeConfig {
                      String coreAdminHandlerClass, String collectionsAdminHandlerClass,
                      String healthCheckHandlerClass, String infoHandlerClass, String configSetsHandlerClass,
                      LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads,
-                     int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
+                     int collectorsPoolSize, int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
                      Properties solrProperties, PluginInfo[] backupRepositoryPlugins,
                      MetricsConfig metricsConfig, PluginInfo transientCacheConfig, PluginInfo tracerConfig) {
     this.nodeName = nodeName;
@@ -106,6 +108,7 @@ public class NodeConfig {
     this.cloudConfig = cloudConfig;
     this.coreLoadThreads = coreLoadThreads;
     this.replayUpdatesThreads = replayUpdatesThreads;
+    this.collectorsPoolSize = collectorsPoolSize;
     this.transientCacheSize = transientCacheSize;
     this.useSchemaCache = useSchemaCache;
     this.managementPath = managementPath;
@@ -161,6 +164,10 @@ public class NodeConfig {
     return replayUpdatesThreads;
   }
 
+  public int getCollectorsPoolSize() {
+    return collectorsPoolSize;
+  }
+
   /**
    * Returns a directory, optionally a comma separated list of directories
    * that will be added to Solr's class path for searching for classes and plugins.
@@ -259,6 +266,7 @@ public class NodeConfig {
     private CloudConfig cloudConfig;
     private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
     private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors();
+    private int collectorsPoolSize = DEFAULT_COLLECTORS_POOL_SIZE;
     @Deprecated
     //Remove in 7.0 and put it all in the transientCache element in solrconfig.xml
     private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
@@ -274,6 +282,7 @@ public class NodeConfig {
     private final String nodeName;
 
     public static final int DEFAULT_CORE_LOAD_THREADS = 3;
+    public static final int DEFAULT_COLLECTORS_POOL_SIZE = 32768;
     //No:of core load threads in cloud mode is set to a default of 8
     public static final int DEFAULT_CORE_LOAD_THREADS_IN_CLOUD = 8;
 
@@ -389,6 +398,11 @@ public class NodeConfig {
       return this;
     }
 
+    public NodeConfigBuilder setCollectorsPoolSize(int collectorsPoolSize) {
+      this.collectorsPoolSize = collectorsPoolSize;
+      return this;
+    }
+
     // Remove in Solr 7.0
     @Deprecated
     public NodeConfigBuilder setTransientCacheSize(int transientCacheSize) {
@@ -435,7 +449,7 @@ public class NodeConfig {
       return new NodeConfig(nodeName, coreRootDirectory, solrDataHome, booleanQueryMaxClauseCount,
                             configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
                             updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, healthCheckHandlerClass, infoHandlerClass, configSetsHandlerClass,
-                            logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
+                            logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, collectorsPoolSize, transientCacheSize, useSchemaCache, managementPath, loader, solrProperties,
                             backupRepositoryPlugins, metricsConfig, transientCacheConfig, tracerConfig);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index e671b89..2f7ad6c 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -231,7 +231,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
   public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, String name, DirectoryReader r,
       boolean closeReader, boolean enableCache, boolean reserveDirectory, DirectoryFactory directoryFactory)
           throws IOException {
-    super(wrapReader(core, r));
+    super(wrapReader(core, r), core.getCoreContainer().getCollectorExecutor());
 
     this.path = path;
     this.directoryFactory = directoryFactory;
diff --git a/solr/core/src/test/org/apache/solr/search/TestFiltering.java b/solr/core/src/test/org/apache/solr/search/TestFiltering.java
index 6d5e081..392e8bc 100644
--- a/solr/core/src/test/org/apache/solr/search/TestFiltering.java
+++ b/solr/core/src/test/org/apache/solr/search/TestFiltering.java
@@ -24,7 +24,6 @@ import java.util.Locale;
 
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Query;
-import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
@@ -87,19 +86,14 @@ public class TestFiltering extends SolrTestCaseJ4 {
         QueryResult res = new QueryResult();
         searcher.search(res, cmd);
         set = res.getDocSet();
-        System.out.println("Live: "+bitsString(live.getFixedBitSet()));
-        System.out.println("Set: "+bitsString(set.getFixedBitSet()));
-        FixedBitSet xor = live.getFixedBitSet().clone();
-        xor.xor(set.getFixedBitSet());
-        System.out.println("xor: "+bitsString(xor));
-        assertTrue( set.equals(live) );
+        assertTrue( equals(live, set) );
 
         cmd.setQuery( QParser.getParser(qstr + " OR id:0", null, req).getQuery() );
         cmd.setFilterList( QParser.getParser(qstr + " OR id:1", null, req).getQuery() );
         res = new QueryResult();
         searcher.search(res, cmd);
         set = res.getDocSet();
-        assertTrue( set.equals(live) );
+        assertTrue( equals(live, set) );
       }
 
     } finally {
@@ -107,12 +101,23 @@ public class TestFiltering extends SolrTestCaseJ4 {
     }
   }
 
-  private String bitsString(Bits bits) {
-    String s = "";
-    for (int i=0; i<bits.length(); i++)
-      s+=bits.get(i)? 1: 0;
-    return s;
+  boolean equals(DocSet ds1, DocSet ds2) {
+    DocSet smaller = ds1.getFixedBitSet().length() < ds2.getFixedBitSet().length() ? ds1: ds2;
+    DocSet larger = ds1.getFixedBitSet().length() > ds2.getFixedBitSet().length() ? ds1: ds2;
+    for (int i=0; i<Math.max(smaller.getFixedBitSet().length(), larger.getFixedBitSet().length()); i++) {
+      if (i>=smaller.getFixedBitSet().length()) {
+        if (larger.getFixedBitSet().get(i) == true) {
+          return false;
+        }
+      } else {
+        if (larger.getFixedBitSet().get(i) != smaller.getFixedBitSet().get(i)) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
+
     public void testCaching() throws Exception {
     clearIndex();
     assertU(adoc("id","4", "val_i","1"));