You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:49:45 UTC

[lucene] 01/02: SOLR-13350: Multi-threaded search using collectors manager

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch jira/solr-13350-8x
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit d5b845454011d73cc7ef3f3e0688cfde0acf4698
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Mon Dec 28 12:38:07 2020 +0530

    SOLR-13350: Multi-threaded search using collectors manager
---
 .../org/apache/lucene/search/MultiCollector.java   |   6 +
 .../java/org/apache/solr/core/CoreContainer.java   |  12 +-
 .../src/java/org/apache/solr/core/NodeConfig.java  |  18 ++-
 .../solr/handler/component/QueryComponent.java     |   2 +-
 .../org/apache/solr/request/SolrRequestInfo.java   |  23 +++
 .../org/apache/solr/search/SolrIndexSearcher.java  | 180 ++++++++++++++++-----
 .../org/apache/solr/search/TopLevelJoinQuery.java  |   2 +-
 .../test/org/apache/solr/search/TestFiltering.java |  23 ++-
 8 files changed, 222 insertions(+), 44 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java b/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java
index 5cb6db8..ab426de 100644
--- a/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java
+++ b/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java
@@ -20,6 +20,7 @@ package org.apache.lucene.search;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.lucene.index.LeafReaderContext;
@@ -115,6 +116,11 @@ public class MultiCollector implements Collector {
     return scoreMode;
   }
 
+  // nocommit: need to raise a LUCENE jira for this?
+  public List<Collector> getCollectors() {
+    return Collections.unmodifiableList(Arrays.asList(collectors));
+  }
+
   @Override
   public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
     final List<LeafCollector> leafCollectors = new ArrayList<>(collectors.length);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 6022f83..29217c4 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -204,6 +204,8 @@ public class CoreContainer {
   private volatile ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
       new SolrNamedThreadFactory("coreContainerWorkExecutor"));
 
+  private final ExecutorService collectorExecutor;
+
   private final OrderedExecutor replayUpdatesExecutor;
 
   @SuppressWarnings({"rawtypes"})
@@ -351,7 +353,9 @@ public class CoreContainer {
         ExecutorUtil.newMDCAwareCachedThreadPool(
             cfg.getReplayUpdatesThreads(),
             new SolrNamedThreadFactory("replayUpdatesExecutor")));
-
+    this.collectorExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(cfg.getCollectorsPoolSize(),
+            new SolrNamedThreadFactory("searcherCollector"));
+        
     this.allowPaths = new java.util.HashSet<>();
     this.allowPaths.add(cfg.getSolrHome());
     this.allowPaths.add(cfg.getCoreRootDirectory());
@@ -566,6 +570,7 @@ public class CoreContainer {
     cfg = null;
     containerProperties = null;
     replayUpdatesExecutor = null;
+    this.collectorExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("searcherCollectorExecutor"));
   }
 
   public static CoreContainer createAndLoad(Path solrHome) {
@@ -1016,6 +1021,7 @@ public class CoreContainer {
       log.info("Shutting down CoreContainer instance={}", System.identityHashCode(this));
     }
 
+    ExecutorUtil.shutdownAndAwaitTermination(collectorExecutor);
     ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
     ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool"));
 
@@ -2255,6 +2261,10 @@ public class CoreContainer {
   public void runAsync(Runnable r) {
     coreContainerAsyncTaskExecutor.submit(r);
   }
+  
+  public ExecutorService getCollectorExecutor() {
+    return collectorExecutor;
+  }
 }
 
 class CloserThread extends Thread {
diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
index 3b8e40e..6afc14c 100644
--- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java
@@ -67,6 +67,8 @@ public class NodeConfig {
 
   private final int replayUpdatesThreads;
 
+  private final int collectorsPoolSize;
+
   @Deprecated
   // This should be part of the transientCacheConfig, remove in 7.0
   private final int transientCacheSize;
@@ -91,7 +93,7 @@ public class NodeConfig {
                      String coreAdminHandlerClass, String collectionsAdminHandlerClass,
                      String healthCheckHandlerClass, String infoHandlerClass, String configSetsHandlerClass,
                      LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads,
-                     int transientCacheSize, boolean useSchemaCache, String managementPath,
+                     int collectorsPoolSize, int transientCacheSize, boolean useSchemaCache, String managementPath,
                      Path solrHome, SolrResourceLoader loader,
                      Properties solrProperties, PluginInfo[] backupRepositoryPlugins,
                      MetricsConfig metricsConfig, PluginInfo transientCacheConfig, PluginInfo tracerConfig,
@@ -114,6 +116,7 @@ public class NodeConfig {
     this.cloudConfig = cloudConfig;
     this.coreLoadThreads = coreLoadThreads;
     this.replayUpdatesThreads = replayUpdatesThreads;
+    this.collectorsPoolSize = collectorsPoolSize;
     this.transientCacheSize = transientCacheSize;
     this.useSchemaCache = useSchemaCache;
     this.managementPath = managementPath;
@@ -176,6 +179,10 @@ public class NodeConfig {
     return replayUpdatesThreads;
   }
 
+  public int getCollectorsPoolSize() {
+    return collectorsPoolSize;
+  }
+
   /**
    * Returns a directory, optionally a comma separated list of directories
    * that will be added to Solr's class path for searching for classes and plugins.
@@ -306,6 +313,7 @@ public class NodeConfig {
     private CloudConfig cloudConfig;
     private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
     private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors();
+    private int collectorsPoolSize = DEFAULT_COLLECTORS_POOL_SIZE;
     @Deprecated
     //Remove in 7.0 and put it all in the transientCache element in solrconfig.xml
     private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
@@ -323,6 +331,7 @@ public class NodeConfig {
     private final String nodeName;
 
     public static final int DEFAULT_CORE_LOAD_THREADS = 3;
+    public static final int DEFAULT_COLLECTORS_POOL_SIZE = 32768;
     //No:of core load threads in cloud mode is set to a default of 8
     public static final int DEFAULT_CORE_LOAD_THREADS_IN_CLOUD = 8;
 
@@ -435,6 +444,11 @@ public class NodeConfig {
       return this;
     }
 
+    public NodeConfigBuilder setCollectorsPoolSize(int collectorsPoolSize) {
+      this.collectorsPoolSize = collectorsPoolSize;
+      return this;
+    }
+
     // Remove in Solr 7.0
     @Deprecated
     public NodeConfigBuilder setTransientCacheSize(int transientCacheSize) {
@@ -495,7 +509,7 @@ public class NodeConfig {
       return new NodeConfig(nodeName, coreRootDirectory, solrDataHome, booleanQueryMaxClauseCount,
                             configSetBaseDirectory, sharedLibDirectory, shardHandlerFactoryConfig,
                             updateShardHandlerConfig, coreAdminHandlerClass, collectionsAdminHandlerClass, healthCheckHandlerClass, infoHandlerClass, configSetsHandlerClass,
-                            logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, transientCacheSize, useSchemaCache, managementPath,
+                            logWatcherConfig, cloudConfig, coreLoadThreads, replayUpdatesThreads, collectorsPoolSize, transientCacheSize, useSchemaCache, managementPath,
                             solrHome, loader, solrProperties,
                             backupRepositoryPlugins, metricsConfig, transientCacheConfig, tracerConfig, defaultZkHost, allowPaths);
     }
diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
index 853da1c..ce3fde8 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
@@ -932,7 +932,7 @@ public class QueryComponent extends SearchComponent
 
         @SuppressWarnings({"rawtypes"})
         NamedList sortFieldValues = (NamedList)(srsp.getSolrResponse().getResponse().get("sort_values"));
-        if (sortFieldValues.size()==0 && // we bypass merging this response only if it's partial itself
+        if (sortFieldValues == null || sortFieldValues.size()==0 && // we bypass merging this response only if it's partial itself
                             thisResponseIsPartial) { // but not the previous one!!
           continue; //fsv timeout yields empty sort_vlaues
         }
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index 1de36c8..3a252f3 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.solr.common.Callable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -51,6 +52,8 @@ public class SolrRequestInfo {
   protected TimeZone tz;
   protected ResponseBuilder rb;
   protected List<Closeable> closeHooks;
+  protected List<Callable> initHooks;
+  protected Object initData; // Any additional auxiliary data that needs to be stored
   protected SolrDispatchFilter.Action action;
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -182,6 +185,16 @@ public class SolrRequestInfo {
     this.rb = rb;
   }
 
+  public void addInitHook(Callable hook) {
+    // is this better here, or on SolrQueryRequest?
+    synchronized (this) {
+      if (initHooks == null) {
+        initHooks = new LinkedList<>();
+      }
+      initHooks.add(hook);
+    }
+  }
+
   public void addCloseHook(Closeable hook) {
     // is this better here, or on SolrQueryRequest?
     synchronized (this) {
@@ -200,6 +213,16 @@ public class SolrRequestInfo {
     this.action = action;
   }
 
+  public Object getInitData() {
+    return initData;
+  }
+
+  public void setInitData(Object initData) {
+    synchronized (this) {
+      this.initData = initData;
+    }
+  }
+
   public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
     return new ExecutorUtil.InheritableThreadLocalProvider() {
       @Override
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index 7000f5d..83be2cf 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -79,6 +80,7 @@ import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.facet.UnInvertedField;
+import org.apache.solr.search.join.GraphQuery;
 import org.apache.solr.search.stats.StatsCache;
 import org.apache.solr.search.stats.StatsSource;
 import org.apache.solr.uninverting.UninvertingReader;
@@ -241,7 +243,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
   public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, String name, DirectoryReader r,
       boolean closeReader, boolean enableCache, boolean reserveDirectory, DirectoryFactory directoryFactory)
           throws IOException {
-    super(wrapReader(core, r));
+    super(wrapReader(core, r), core.getCoreContainer().getCollectorExecutor());
 
     this.path = path;
     this.directoryFactory = directoryFactory;
@@ -1586,20 +1588,39 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
       qr.setNextCursorMark(cmd.getCursorMark());
       hitsRelation = Relation.EQUAL_TO;
     } else {
-      final TopDocsCollector<?> topCollector = buildTopDocsCollector(len, cmd);
-      MaxScoreCollector maxScoreCollector = null;
-      Collector collector = topCollector;
-      if ((cmd.getFlags() & GET_SCORES) != 0) {
-        maxScoreCollector = new MaxScoreCollector();
-        collector = MultiCollector.wrap(topCollector, maxScoreCollector);
-      }
-      ScoreMode scoreModeUsed = buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter).scoreMode();
-
-      totalHits = topCollector.getTotalHits();
-      TopDocs topDocs = topCollector.topDocs(0, len);
-      if (scoreModeUsed == ScoreMode.COMPLETE || scoreModeUsed == ScoreMode.COMPLETE_NO_SCORES) {
-        hitsRelation = TotalHits.Relation.EQUAL_TO;
+      TopDocs topDocs;
+      log.info("calling from 2, query: "+query.getClass()); // nocommit
+      if (pf.postFilter != null || cmd.getSegmentTerminateEarly() || cmd.getTimeAllowed() > 0 
+          || query instanceof RankQuery || query instanceof GraphQuery) {
+        log.debug("skipping collector manager");
+        final TopDocsCollector<?> topCollector = buildTopDocsCollector(len, cmd);
+        MaxScoreCollector maxScoreCollector = null;
+        Collector collector = topCollector;
+        if ((cmd.getFlags() & GET_SCORES) != 0) {
+          maxScoreCollector = new MaxScoreCollector();
+          collector = MultiCollector.wrap(topCollector, maxScoreCollector);
+        }
+        ScoreMode scoreModeUsed = buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter).scoreMode();
+
+        totalHits = topCollector.getTotalHits();
+        topDocs = topCollector.topDocs(0, len);
+        if (scoreModeUsed == ScoreMode.COMPLETE || scoreModeUsed == ScoreMode.COMPLETE_NO_SCORES) {
+          hitsRelation = TotalHits.Relation.EQUAL_TO;
+        } else {
+          hitsRelation = topDocs.totalHits.relation;
+        }
+        nDocsReturned = topDocs.scoreDocs.length;
+        maxScore = totalHits > 0 ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f;
       } else {
+        log.debug("using collectormanager");
+        CollectorManagerResult result = searchCollectorManagers(len, cmd, query, true, true, false); // nocommit: need docset should be false
+        totalHits = result.totalHits;
+
+        maxScore = result.maxScore;
+        nDocsReturned = result.topDocs.scoreDocs.length;
+        topDocs = result.topDocs;
+
+        //TODO: Is this correct?
         hitsRelation = topDocs.totalHits.relation;
       }
       if (cmd.getSort() != null && cmd.getQuery() instanceof RankQuery == false && (cmd.getFlags() & GET_SCORES) != 0) {
@@ -1607,8 +1628,6 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
       }
       populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);
 
-      maxScore = totalHits > 0 ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f;
-      nDocsReturned = topDocs.scoreDocs.length;
       ids = new int[nDocsReturned];
       scores = (cmd.getFlags() & GET_SCORES) != 0 ? new float[nDocsReturned] : null;
       for (int i = 0; i < nDocsReturned; i++) {
@@ -1623,6 +1642,85 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
     qr.setDocList(new DocSlice(0, sliceLen, ids, scores, totalHits, maxScore, hitsRelation));
   }
 
+  CollectorManagerResult searchCollectorManagers(int len, QueryCommand cmd, Query query,
+      boolean needTopDocs, boolean needMaxScore, boolean needDocSet) throws IOException {
+    CollectorManager<MultiCollector, CollectorManagerResult> manager = new CollectorManager<MultiCollector, CollectorManagerResult>() {
+      @Override
+      public MultiCollector newCollector() throws IOException {
+        // TODO: DocCollector is not thread safe.
+        Collection<Collector> collectors = new ArrayList<Collector>();
+        if (needTopDocs) collectors.add(buildTopDocsCollector(len, cmd));
+        if (needMaxScore) collectors.add(new MaxScoreCollector());
+        if (needDocSet) collectors.add(new DocSetCollector(maxDoc()));
+        return (MultiCollector) MultiCollector.wrap(collectors);
+      }
+
+      @Override
+      public CollectorManagerResult reduce(Collection<MultiCollector> multiCollectors) throws IOException {
+        final TopDocs[] topDocs = new TopDocs[multiCollectors.size()];
+        float maxScore = 0.0f;
+        DocSet docSet = new BitDocSet(new FixedBitSet(maxDoc())); // TODO: if docset is not needed, avoid this initialization
+        int i = 0;
+        for (MultiCollector multiCollector: multiCollectors) {
+          int c = 0;
+          List<Collector> subCollectors = multiCollector.getCollectors();
+          TopDocsCollector topDocsCollector = needTopDocs? ((TopDocsCollector) subCollectors.get(c++)): null;
+          MaxScoreCollector maxScoreCollector = needMaxScore? ((MaxScoreCollector) subCollectors.get(c++)): null;
+          DocSetCollector docSetCollector = needDocSet? ((DocSetCollector) subCollectors.get(c++)): null;
+          
+          if (needTopDocs)  topDocs[i++] = topDocsCollector.topDocs(0, len);
+          if (needMaxScore) 
+            if (!Float.isNaN(maxScoreCollector.getMaxScore()))
+              maxScore = Math.max(maxScore, maxScoreCollector.getMaxScore());
+          if (needDocSet) {
+            if (docSet == null) {
+              docSet = docSetCollector.getDocSet(); // TODO: Should this be always true? Convert null check into assert?
+            }
+          }
+        }
+        TopDocs mergedTopDocs;
+        if (topDocs != null && topDocs.length>0 && topDocs[0] instanceof TopFieldDocs) {
+          TopFieldDocs[] topFieldDocs = Arrays.copyOf(topDocs, topDocs.length, TopFieldDocs[].class);
+          mergedTopDocs = TopFieldDocs.merge(weightSort(cmd.getSort()), len, topFieldDocs);
+        } else {
+          mergedTopDocs = needTopDocs? TopDocs.merge(0, len, topDocs): null;
+        }
+        int totalHits = needTopDocs? (int)mergedTopDocs.totalHits.value: -1;
+        maxScore = totalHits > 0 ? maxScore : 0.0f;
+        return new CollectorManagerResult(mergedTopDocs, docSet, maxScore, totalHits);
+      }
+
+    };
+
+    CollectorManagerResult ret;
+    try {
+      ret = super.search(query, manager);
+    } catch (Exception ex) {
+      if (ex instanceof RuntimeException && 
+          ex.getCause() != null & ex.getCause() instanceof ExecutionException
+          && ex.getCause().getCause() != null && ex.getCause().getCause() instanceof RuntimeException) {
+        throw (RuntimeException)ex.getCause().getCause();
+      } else {
+        throw ex;
+      }
+    }
+    return ret;
+  }
+
+  class CollectorManagerResult {
+    final TopDocs topDocs;
+    final DocSet docSet;
+    final float maxScore;
+    final int totalHits;
+    
+    public CollectorManagerResult(TopDocs topDocs, DocSet docSet, float maxScore, int totalHits) {
+      this.topDocs = topDocs;
+      this.docSet = docSet;
+      this.maxScore = maxScore;
+      this.totalHits = totalHits;
+    }
+  }
+  
   // any DocSet returned is for the query only, without any filtering... that way it may
   // be cached if desired.
   private DocSet getDocListAndSetNC(QueryResult qr, QueryCommand cmd) throws IOException {
@@ -1677,7 +1775,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
 
         collector = MultiCollector.wrap(setCollector, topScoreCollector);
       }
-
+      log.info("calling from 3");  // nocommit
       buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
 
       set = DocSetUtil.getDocSet(setCollector, this);
@@ -1690,32 +1788,42 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
       // no docs on this page, so cursor doesn't change
       qr.setNextCursorMark(cmd.getCursorMark());
     } else {
-      @SuppressWarnings({"rawtypes"})
-      final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);
-      DocSetCollector setCollector = new DocSetCollector(maxDoc);
-      MaxScoreCollector maxScoreCollector = null;
-      List<Collector> collectors = new ArrayList<>(Arrays.asList(topCollector, setCollector));
-
-      if ((cmd.getFlags() & GET_SCORES) != 0) {
-        maxScoreCollector = new MaxScoreCollector();
-        collectors.add(maxScoreCollector);
-      }
-
-      Collector collector = MultiCollector.wrap(collectors);
-
-      buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
+      TopDocs topDocs;
+
+      if (pf.postFilter != null || cmd.getSegmentTerminateEarly() || cmd.getTimeAllowed() > 0
+          || query instanceof RankQuery || query instanceof GraphQuery) {
+        final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);
+        DocSetCollector setCollector = new DocSetCollector(maxDoc);
+        MaxScoreCollector maxScoreCollector = null;
+        List<Collector> collectors = new ArrayList<>(Arrays.asList(topCollector, setCollector));
+
+        if ((cmd.getFlags() & GET_SCORES) != 0) {
+          maxScoreCollector = new MaxScoreCollector();
+          collectors.add(maxScoreCollector);
+        }
 
-      set = DocSetUtil.getDocSet(setCollector, this);
+        totalHits = topCollector.getTotalHits();
+        set = DocSetUtil.getDocSet(setCollector, this);
 
-      totalHits = topCollector.getTotalHits();
-      assert (totalHits == set.size()) || qr.isPartialResults();
+        assert (totalHits == set.size()) || qr.isPartialResults();
 
-      TopDocs topDocs = topCollector.topDocs(0, len);
+        topDocs = topCollector.topDocs(0, len);
+        maxScore = totalHits > 0 ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f;
+      } else {
+        log.debug("using collectormanager");
+        CollectorManagerResult result = searchCollectorManagers(len, cmd, query, true, true, true);
+        set = result.docSet;
+        totalHits = result.totalHits;
+        //assert (totalHits == set.size()) || qr.isPartialResults();
+        topDocs = result.topDocs;
+        maxScore = result.maxScore;
+      }
+      
       if (cmd.getSort() != null && cmd.getQuery() instanceof RankQuery == false && (cmd.getFlags() & GET_SCORES) != 0) {
         TopFieldCollector.populateScores(topDocs.scoreDocs, this, query);
       }
       populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);
-      maxScore = totalHits > 0 ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f;
+
       nDocsReturned = topDocs.scoreDocs.length;
 
       ids = new int[nDocsReturned];
diff --git a/solr/core/src/java/org/apache/solr/search/TopLevelJoinQuery.java b/solr/core/src/java/org/apache/solr/search/TopLevelJoinQuery.java
index 428c229..92dbfa5 100644
--- a/solr/core/src/java/org/apache/solr/search/TopLevelJoinQuery.java
+++ b/solr/core/src/java/org/apache/solr/search/TopLevelJoinQuery.java
@@ -62,7 +62,7 @@ public class TopLevelJoinQuery extends JoinQuery {
 
     final SolrIndexSearcher solrSearcher = (SolrIndexSearcher) searcher;
     final JoinQueryWeight weight = new JoinQueryWeight(solrSearcher, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
-    final SolrIndexSearcher fromSearcher = weight.fromSearcher;
+    final SolrIndexSearcher fromSearcher = weight.toSearcher; // fromIndex isn't specified, so this has to be toSearcher
     final SolrIndexSearcher toSearcher = weight.toSearcher;
 
     try {
diff --git a/solr/core/src/test/org/apache/solr/search/TestFiltering.java b/solr/core/src/test/org/apache/solr/search/TestFiltering.java
index b2ad9cf..392e8bc 100644
--- a/solr/core/src/test/org/apache/solr/search/TestFiltering.java
+++ b/solr/core/src/test/org/apache/solr/search/TestFiltering.java
@@ -77,7 +77,7 @@ public class TestFiltering extends SolrTestCaseJ4 {
         if (live == null) {
           live = searcher.getLiveDocSet();
         }
-        assertTrue( set == live);
+        assertTrue( set.equals(live) );
 
         QueryCommand cmd = new QueryCommand();
         cmd.setQuery( QParser.getParser(qstr, null, req).getQuery() );
@@ -86,14 +86,14 @@ public class TestFiltering extends SolrTestCaseJ4 {
         QueryResult res = new QueryResult();
         searcher.search(res, cmd);
         set = res.getDocSet();
-        assertTrue( set == live );
+        assertTrue( equals(live, set) );
 
         cmd.setQuery( QParser.getParser(qstr + " OR id:0", null, req).getQuery() );
         cmd.setFilterList( QParser.getParser(qstr + " OR id:1", null, req).getQuery() );
         res = new QueryResult();
         searcher.search(res, cmd);
         set = res.getDocSet();
-        assertTrue( set == live );
+        assertTrue( equals(live, set) );
       }
 
     } finally {
@@ -101,6 +101,23 @@ public class TestFiltering extends SolrTestCaseJ4 {
     }
   }
 
+  boolean equals(DocSet ds1, DocSet ds2) {
+    DocSet smaller = ds1.getFixedBitSet().length() < ds2.getFixedBitSet().length() ? ds1: ds2;
+    DocSet larger = ds1.getFixedBitSet().length() > ds2.getFixedBitSet().length() ? ds1: ds2;
+    for (int i=0; i<Math.max(smaller.getFixedBitSet().length(), larger.getFixedBitSet().length()); i++) {
+      if (i>=smaller.getFixedBitSet().length()) {
+        if (larger.getFixedBitSet().get(i) == true) {
+          return false;
+        }
+      } else {
+        if (larger.getFixedBitSet().get(i) != smaller.getFixedBitSet().get(i)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
     public void testCaching() throws Exception {
     clearIndex();
     assertU(adoc("id","4", "val_i","1"));