You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2011/01/11 00:16:41 UTC

svn commit: r1057413 - in /lucene/dev/branches/branch_3x/lucene/src: java/org/apache/lucene/search/IndexSearcher.java java/org/apache/lucene/search/ParallelMultiSearcher.java test/org/apache/lucene/index/TestNRTThreads.java

Author: mikemccand
Date: Mon Jan 10 23:16:40 2011
New Revision: 1057413

URL: http://svn.apache.org/viewvc?rev=1057413&view=rev
Log:
LUCENE-2837: port back to 3x aborbing of PMS into IS

Modified:
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/IndexSearcher.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
    lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/IndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/IndexSearcher.java?rev=1057413&r1=1057412&r2=1057413&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/IndexSearcher.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/IndexSearcher.java Mon Jan 10 23:16:40 2011
@@ -19,7 +19,17 @@ package org.apache.lucene.search;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.FieldSelector;
@@ -28,6 +38,7 @@ import org.apache.lucene.index.IndexRead
 import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.ReaderUtil;
+import org.apache.lucene.util.ThreadInterruptedException;
 
 /** Implements search over a single IndexReader.
  *
@@ -50,8 +61,15 @@ public class IndexSearcher extends Searc
   
   // NOTE: these members might change in incompatible ways
   // in the next release
-  protected IndexReader[] subReaders;
-  protected int[] docStarts;
+  protected final IndexReader[] subReaders;
+  protected final int[] docStarts;
+  
+  // These are only used for multi-threaded search
+  private final ExecutorService executor;
+  protected final IndexSearcher[] subSearchers;
+
+  /** The Similarity implementation used by this searcher. */
+  private Similarity similarity = Similarity.getDefault();
 
   /** Creates a searcher searching the index in the named
    *  directory, with readOnly=true
@@ -60,7 +78,7 @@ public class IndexSearcher extends Searc
    * @throws IOException if there is a low-level IO error
    */
   public IndexSearcher(Directory path) throws CorruptIndexException, IOException {
-    this(IndexReader.open(path, true), true);
+    this(IndexReader.open(path, true), true, null);
   }
 
   /** Creates a searcher searching the index in the named
@@ -75,12 +93,27 @@ public class IndexSearcher extends Searc
    * @throws IOException if there is a low-level IO error
    */
   public IndexSearcher(Directory path, boolean readOnly) throws CorruptIndexException, IOException {
-    this(IndexReader.open(path, readOnly), true);
+    this(IndexReader.open(path, readOnly), true, null);
   }
 
   /** Creates a searcher searching the provided index. */
   public IndexSearcher(IndexReader r) {
-    this(r, false);
+    this(r, false, null);
+  }
+
+  /** Runs searches for each segment separately, using the
+   *  provided ExecutorService.  IndexSearcher will not
+   *  shutdown/awaitTermination this ExecutorService on
+   *  close; you must do so, eventually, on your own.  NOTE:
+   *  if you are using {@link NIOFSDirectory}, do not use
+   *  the shutdownNow method of ExecutorService as this uses
+   *  Thread.interrupt under-the-hood which can silently
+   *  close file descriptors (see <a
+   *  href="https://issues.apache.org/jira/browse/LUCENE-2239">LUCENE-2239</a>).
+   * 
+   * @lucene.experimental */
+  public IndexSearcher(IndexReader r, ExecutorService executor) {
+    this(r, false, executor);
   }
 
   /** Expert: directly specify the reader, subReaders and
@@ -92,10 +125,42 @@ public class IndexSearcher extends Searc
     this.subReaders = subReaders;
     this.docStarts = docStarts;
     closeReader = false;
+    executor = null;
+    subSearchers = null;
   }
   
-  private IndexSearcher(IndexReader r, boolean closeReader) {
+  /** Expert: directly specify the reader, subReaders and
+   *  their docID starts, and an ExecutorService.  In this
+   *  case, each segment will be separately searched using the
+   *  ExecutorService.  IndexSearcher will not
+   *  shutdown/awaitTermination this ExecutorService on
+   *  close; you must do so, eventually, on your own.  NOTE:
+   *  if you are using {@link NIOFSDirectory}, do not use
+   *  the shutdownNow method of ExecutorService as this uses
+   *  Thread.interrupt under-the-hood which can silently
+   *  close file descriptors (see <a
+   *  href="https://issues.apache.org/jira/browse/LUCENE-2239">LUCENE-2239</a>).
+   * 
+   * @lucene.experimental */
+  public IndexSearcher(IndexReader reader, IndexReader[] subReaders, int[] docStarts, ExecutorService executor) {
+    this.reader = reader;
+    this.subReaders = subReaders;
+    this.docStarts = docStarts;
+    if (executor == null) {
+      subSearchers = null;
+    } else {
+      subSearchers = new IndexSearcher[subReaders.length];
+      for(int i=0;i<subReaders.length;i++) {
+        subSearchers[i] = new IndexSearcher(subReaders[i]);
+      }
+    }
+    closeReader = false;
+    this.executor = executor;
+  }
+
+  private IndexSearcher(IndexReader r, boolean closeReader, ExecutorService executor) {
     reader = r;
+    this.executor = executor;
     this.closeReader = closeReader;
 
     List<IndexReader> subReadersList = new ArrayList<IndexReader>();
@@ -107,6 +172,18 @@ public class IndexSearcher extends Searc
       docStarts[i] = maxDoc;
       maxDoc += subReaders[i].maxDoc();
     }
+    if (executor == null) {
+      subSearchers = null;
+    } else {
+      subSearchers = new IndexSearcher[subReaders.length];
+      for (int i = 0; i < subReaders.length; i++) {
+        if (subReaders[i] == r) {
+          subSearchers[i] = this;
+        } else {
+          subSearchers[i] = new IndexSearcher(subReaders[i]);
+        }
+      }
+    }
   }
 
   protected void gatherSubReaders(List<IndexReader> allSubReaders, IndexReader r) {
@@ -118,58 +195,218 @@ public class IndexSearcher extends Searc
     return reader;
   }
 
+  /** Returns the atomic subReaders used by this searcher. */
+  public IndexReader[] getSubReaders() {
+    return subReaders;
+  }
+
+  /** Expert: Returns one greater than the largest possible document number.
+   * 
+   * @see org.apache.lucene.index.IndexReader#maxDoc()
+   */
+  public int maxDoc() {
+    return reader.maxDoc();
+  }
+
+  /** Returns total docFreq for this term. */
+  public int docFreq(final Term term) throws IOException {
+    if (executor == null) {
+      return reader.docFreq(term);
+    } else {
+      final ExecutionHelper<Integer> runner = new ExecutionHelper<Integer>(executor);
+      for(int i = 0; i < subReaders.length; i++) {
+        final IndexSearcher searchable = subSearchers[i];
+        runner.submit(new Callable<Integer>() {
+            public Integer call() throws IOException {
+              return Integer.valueOf(searchable.docFreq(term));
+            }
+          });
+      }
+      int docFreq = 0;
+      for (Integer num : runner) {
+        docFreq += num.intValue();
+      }
+      return docFreq;
+    }
+  }
+
+  /* Sugar for .getIndexReader().document(docID) */
+  public Document doc(int docID) throws CorruptIndexException, IOException {
+    return reader.document(docID);
+  }
+  
+  /* Sugar for .getIndexReader().document(docID, fieldSelector) */
+  public Document doc(int docID, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
+    return reader.document(docID, fieldSelector);
+  }
+  
+  /** Expert: Set the Similarity implementation used by this Searcher.
+   *
+   * @see Similarity#setDefault(Similarity)
+   */
+  public void setSimilarity(Similarity similarity) {
+    this.similarity = similarity;
+  }
+
+  public Similarity getSimilarity() {
+    return similarity;
+  }
+
   /**
    * Note that the underlying IndexReader is not closed, if
    * IndexSearcher was constructed with IndexSearcher(IndexReader r).
    * If the IndexReader was supplied implicitly by specifying a directory, then
-   * the IndexReader gets closed.
+   * the IndexReader is closed.
    */
-  @Override
   public void close() throws IOException {
-    if(closeReader)
+    if (closeReader) {
       reader.close();
+    }
   }
 
-  // inherit javadoc
-  @Override
-  public int docFreq(Term term) throws IOException {
-    return reader.docFreq(term);
+  /** Finds the top <code>n</code>
+   * hits for <code>query</code>.
+   *
+   * @throws BooleanQuery.TooManyClauses
+   */
+  public TopDocs search(Query query, int n)
+    throws IOException {
+    return search(query, null, n);
   }
 
-  // inherit javadoc
-  @Override
-  public Document doc(int i) throws CorruptIndexException, IOException {
-    return reader.document(i);
+
+  /** Finds the top <code>n</code>
+   * hits for <code>query</code>, applying <code>filter</code> if non-null.
+   *
+   * @throws BooleanQuery.TooManyClauses
+   */
+  public TopDocs search(Query query, Filter filter, int n)
+    throws IOException {
+    return search(createWeight(query), filter, n);
   }
-  
-  // inherit javadoc
-  @Override
-  public Document doc(int i, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
-	    return reader.document(i, fieldSelector);
+
+  /** Lower-level search API.
+   *
+   * <p>{@link Collector#collect(int)} is called for every matching
+   * document.
+   * <br>Collector-based access to remote indexes is discouraged.
+   *
+   * <p>Applications should only use this if they need <i>all</i> of the
+   * matching documents.  The high-level search API ({@link
+   * Searcher#search(Query, Filter, int)}) is usually more efficient, as it skips
+   * non-high-scoring hits.
+   *
+   * @param query to match documents
+   * @param filter if non-null, used to permit documents to be collected.
+   * @param results to receive hits
+   * @throws BooleanQuery.TooManyClauses
+   */
+  public void search(Query query, Filter filter, Collector results)
+    throws IOException {
+    search(createWeight(query), filter, results);
+  }
+
+  /** Lower-level search API.
+  *
+  * <p>{@link Collector#collect(int)} is called for every matching document.
+  *
+  * <p>Applications should only use this if they need <i>all</i> of the
+  * matching documents.  The high-level search API ({@link
+  * Searcher#search(Query, int)}) is usually more efficient, as it skips
+  * non-high-scoring hits.
+  * <p>Note: The <code>score</code> passed to this method is a raw score.
+  * In other words, the score will not necessarily be a float whose value is
+  * between 0 and 1.
+  * @throws BooleanQuery.TooManyClauses
+  */
+  public void search(Query query, Collector results)
+    throws IOException {
+    search(createWeight(query), null, results);
   }
   
-  // inherit javadoc
-  @Override
-  public int maxDoc() throws IOException {
-    return reader.maxDoc();
+  /** Search implementation with arbitrary sorting.  Finds
+   * the top <code>n</code> hits for <code>query</code>, applying
+   * <code>filter</code> if non-null, and sorting the hits by the criteria in
+   * <code>sort</code>.
+   * 
+   * <p>NOTE: this does not compute scores by default; use
+   * {@link IndexSearcher#setDefaultFieldSortScoring} to
+   * enable scoring.
+   *
+   * @throws BooleanQuery.TooManyClauses
+   */
+  public TopFieldDocs search(Query query, Filter filter, int n,
+                             Sort sort) throws IOException {
+    return search(createWeight(query), filter, n, sort);
+  }
+
+  /**
+   * Search implementation with arbitrary sorting and no filter.
+   * @param query The query to search for
+   * @param n Return only the top n results
+   * @param sort The {@link org.apache.lucene.search.Sort} object
+   * @return The top docs, sorted according to the supplied {@link org.apache.lucene.search.Sort} instance
+   * @throws IOException
+   */
+  public TopFieldDocs search(Query query, int n,
+                             Sort sort) throws IOException {
+    return search(createWeight(query), null, n, sort);
   }
 
-  // inherit javadoc
-  @Override
+  /** Expert: Low-level search implementation.  Finds the top <code>n</code>
+   * hits for <code>query</code>, applying <code>filter</code> if non-null.
+   *
+   * <p>Applications should usually call {@link Searcher#search(Query,int)} or
+   * {@link Searcher#search(Query,Filter,int)} instead.
+   * @throws BooleanQuery.TooManyClauses
+   */
   public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
 
-    int limit = reader.maxDoc();
-    if (limit == 0) {
-      limit = 1;
-    }
-    nDocs = Math.min(nDocs, limit);
+    if (executor == null) {
+      // single thread
+      int limit = reader.maxDoc();
+      if (limit == 0) {
+        limit = 1;
+      }
+      nDocs = Math.min(nDocs, limit);
+      TopScoreDocCollector collector = TopScoreDocCollector.create(nDocs, !weight.scoresDocsOutOfOrder());
+      search(weight, filter, collector);
+      return collector.topDocs();
+    } else {
+      final HitQueue hq = new HitQueue(nDocs, false);
+      final Lock lock = new ReentrantLock();
+      final ExecutionHelper<TopDocs> runner = new ExecutionHelper<TopDocs>(executor);
+    
+      for (int i = 0; i < subReaders.length; i++) { // search each sub
+        runner.submit(
+                      new MultiSearcherCallableNoSort(lock, subSearchers[i], weight, filter, nDocs, hq, i, docStarts));
+      }
 
-    TopScoreDocCollector collector = TopScoreDocCollector.create(nDocs, !weight.scoresDocsOutOfOrder());
-    search(weight, filter, collector);
-    return collector.topDocs();
+      int totalHits = 0;
+      float maxScore = Float.NEGATIVE_INFINITY;
+      for (final TopDocs topDocs : runner) {
+        totalHits += topDocs.totalHits;
+        maxScore = Math.max(maxScore, topDocs.getMaxScore());
+      }
+
+      final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+      for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
+        scoreDocs[i] = hq.pop();
+
+      return new TopDocs(totalHits, scoreDocs, maxScore);
+    }
   }
 
-  @Override
+  /** Expert: Low-level search implementation with arbitrary sorting.  Finds
+   * the top <code>n</code> hits for <code>query</code>, applying
+   * <code>filter</code> if non-null, and sorting the hits by the criteria in
+   * <code>sort</code>.
+   *
+   * <p>Applications should usually call {@link
+   * Searcher#search(Query,Filter,int,Sort)} instead.
+   * 
+   * @throws BooleanQuery.TooManyClauses
+   */
   public TopFieldDocs search(Weight weight, Filter filter,
       final int nDocs, Sort sort) throws IOException {
     return search(weight, filter, nDocs, sort, true);
@@ -186,26 +423,74 @@ public class IndexSearcher extends Searc
    * then pass that to {@link #search(Weight, Filter,
    * Collector)}.</p>
    */
-  public TopFieldDocs search(Weight weight, Filter filter, int nDocs,
+  protected TopFieldDocs search(Weight weight, Filter filter, int nDocs,
                              Sort sort, boolean fillFields)
       throws IOException {
 
-    int limit = reader.maxDoc();
-    if (limit == 0) {
-      limit = 1;
-    }
-    nDocs = Math.min(nDocs, limit);
+    if (sort == null) throw new NullPointerException();
+
+    if (executor == null) {
+      // single thread
+      int limit = reader.maxDoc();
+      if (limit == 0) {
+        limit = 1;
+      }
+      nDocs = Math.min(nDocs, limit);
+
+      TopFieldCollector collector = TopFieldCollector.create(sort, nDocs,
+                                                             fillFields, fieldSortDoTrackScores, fieldSortDoMaxScore, !weight.scoresDocsOutOfOrder());
+      search(weight, filter, collector);
+      return (TopFieldDocs) collector.topDocs();
+    } else {
+      // TODO: make this respect fillFields
+      final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs);
+      final Lock lock = new ReentrantLock();
+      final ExecutionHelper<TopFieldDocs> runner = new ExecutionHelper<TopFieldDocs>(executor);
+      for (int i = 0; i < subReaders.length; i++) { // search each sub
+        runner.submit(
+                      new MultiSearcherCallableWithSort(lock, subSearchers[i], weight, filter, nDocs, hq, sort, i, docStarts));
+      }
+      int totalHits = 0;
+      float maxScore = Float.NEGATIVE_INFINITY;
+      for (final TopFieldDocs topFieldDocs : runner) {
+        totalHits += topFieldDocs.totalHits;
+        maxScore = Math.max(maxScore, topFieldDocs.getMaxScore());
+      }
+      final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+      for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
+        scoreDocs[i] = hq.pop();
 
-    TopFieldCollector collector = TopFieldCollector.create(sort, nDocs,
-        fillFields, fieldSortDoTrackScores, fieldSortDoMaxScore, !weight.scoresDocsOutOfOrder());
-    search(weight, filter, collector);
-    return (TopFieldDocs) collector.topDocs();
+      return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
+    }
   }
 
-  @Override
+  /**
+   * Lower-level search API.
+   * 
+   * <p>
+   * {@link Collector#collect(int)} is called for every document. <br>
+   * Collector-based access to remote indexes is discouraged.
+   * 
+   * <p>
+   * Applications should only use this if they need <i>all</i> of the matching
+   * documents. The high-level search API ({@link Searcher#search(Query,int)}) is
+   * usually more efficient, as it skips non-high-scoring hits.
+   * 
+   * @param weight
+   *          to match documents
+   * @param filter
+   *          if non-null, used to permit documents to be collected.
+   * @param collector
+   *          to receive hits
+   * @throws BooleanQuery.TooManyClauses
+   */
   public void search(Weight weight, Filter filter, Collector collector)
       throws IOException {
-    
+
+    // TODO: should we make this
+    // threaded...?  the Collector could be sync'd?
+
+    // always use single thread:
     if (filter == null) {
       for (int i = 0; i < subReaders.length; i++) { // search each subreader
         collector.setNextReader(subReaders[i], docStarts[i]);
@@ -268,7 +553,9 @@ public class IndexSearcher extends Searc
     }
   }
 
-  @Override
+  /** Expert: called to re-write queries into primitive queries.
+   * @throws BooleanQuery.TooManyClauses
+   */
   public Query rewrite(Query original) throws IOException {
     Query query = original;
     for (Query rewrittenQuery = query.rewrite(reader); rewrittenQuery != query;
@@ -278,7 +565,29 @@ public class IndexSearcher extends Searc
     return query;
   }
 
-  @Override
+  /** Returns an Explanation that describes how <code>doc</code> scored against
+   * <code>query</code>.
+   *
+   * <p>This is intended to be used in developing Similarity implementations,
+   * and, for good performance, should not be displayed with every hit.
+   * Computing an explanation is as expensive as executing the query over the
+   * entire index.
+   */
+  public Explanation explain(Query query, int doc) throws IOException {
+    return explain(createWeight(query), doc);
+  }
+
+  /** Expert: low-level implementation method
+   * Returns an Explanation that describes how <code>doc</code> scored against
+   * <code>weight</code>.
+   *
+   * <p>This is intended to be used in developing Similarity implementations,
+   * and, for good performance, should not be displayed with every hit.
+   * Computing an explanation is as expensive as executing the query over the
+   * entire index.
+   * <p>Applications should call {@link Searcher#explain(Query, int)}.
+   * @throws BooleanQuery.TooManyClauses
+   */
   public Explanation explain(Weight weight, int doc) throws IOException {
     int n = ReaderUtil.subIndex(doc, docStarts);
     int deBasedDoc = doc - docStarts[n];
@@ -305,4 +614,175 @@ public class IndexSearcher extends Searc
     fieldSortDoTrackScores = doTrackScores;
     fieldSortDoMaxScore = doMaxScore;
   }
+
+  /**
+   * creates a weight for <code>query</code>
+   * @return new weight
+   */
+  protected Weight createWeight(Query query) throws IOException {
+    return query.weight(this);
+  }
+
+
+  /**
+   * A thread subclass for searching a single searchable 
+   */
+  private static final class MultiSearcherCallableNoSort implements Callable<TopDocs> {
+
+    private final Lock lock;
+    private final IndexSearcher searchable;
+    private final Weight weight;
+    private final Filter filter;
+    private final int nDocs;
+    private final int i;
+    private final HitQueue hq;
+    private final int[] starts;
+
+    public MultiSearcherCallableNoSort(Lock lock, IndexSearcher searchable, Weight weight,
+        Filter filter, int nDocs, HitQueue hq, int i, int[] starts) {
+      this.lock = lock;
+      this.searchable = searchable;
+      this.weight = weight;
+      this.filter = filter;
+      this.nDocs = nDocs;
+      this.hq = hq;
+      this.i = i;
+      this.starts = starts;
+    }
+
+    public TopDocs call() throws IOException {
+      final TopDocs docs = searchable.search (weight, filter, nDocs);
+      final ScoreDoc[] scoreDocs = docs.scoreDocs;
+      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+        final ScoreDoc scoreDoc = scoreDocs[j];
+        scoreDoc.doc += starts[i]; // convert doc 
+        //it would be so nice if we had a thread-safe insert 
+        lock.lock();
+        try {
+          if (scoreDoc == hq.insertWithOverflow(scoreDoc))
+            break;
+        } finally {
+          lock.unlock();
+        }
+      }
+      return docs;
+    }
+  }
+
+
+  /**
+   * A thread subclass for searching a single searchable 
+   */
+  private static final class MultiSearcherCallableWithSort implements Callable<TopFieldDocs> {
+
+    private final Lock lock;
+    private final IndexSearcher searchable;
+    private final Weight weight;
+    private final Filter filter;
+    private final int nDocs;
+    private final int i;
+    private final FieldDocSortedHitQueue hq;
+    private final int[] starts;
+    private final Sort sort;
+
+    public MultiSearcherCallableWithSort(Lock lock, IndexSearcher searchable, Weight weight,
+        Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, int[] starts) {
+      this.lock = lock;
+      this.searchable = searchable;
+      this.weight = weight;
+      this.filter = filter;
+      this.nDocs = nDocs;
+      this.hq = hq;
+      this.i = i;
+      this.starts = starts;
+      this.sort = sort;
+    }
+
+    public TopFieldDocs call() throws IOException {
+      final TopFieldDocs docs = searchable.search (weight, filter, nDocs, sort);
+      // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
+      // it will break ties by doc Id properly. Otherwise, it will compare to
+      // 'relative' doc Ids, that belong to two different searchables.
+      for (int j = 0; j < docs.fields.length; j++) {
+        if (docs.fields[j].getType() == SortField.DOC) {
+          // iterate over the score docs and change their fields value
+          for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
+            FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
+            fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
+          }
+          break;
+        }
+      }
+
+      lock.lock();
+      try {
+        hq.setFields(docs.fields);
+      } finally {
+        lock.unlock();
+      }
+
+      final ScoreDoc[] scoreDocs = docs.scoreDocs;
+      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+        final FieldDoc fieldDoc = (FieldDoc) scoreDocs[j];
+        fieldDoc.doc += starts[i]; // convert doc 
+        //it would be so nice if we had a thread-safe insert 
+        lock.lock();
+        try {
+          if (fieldDoc == hq.insertWithOverflow(fieldDoc))
+            break;
+        } finally {
+          lock.unlock();
+        }
+      }
+      return docs;
+    }
+  }
+
+  /**
+   * A helper class that wraps a {@link CompletionService} and provides an
+   * iterable interface to the completed {@link Callable} instances.
+   * 
+   * @param <T>
+   *          the type of the {@link Callable} return value
+   */
+  private static final class ExecutionHelper<T> implements Iterator<T>, Iterable<T> {
+    private final CompletionService<T> service;
+    private int numTasks;
+
+    ExecutionHelper(final Executor executor) {
+      this.service = new ExecutorCompletionService<T>(executor);
+    }
+
+    public boolean hasNext() {
+      return numTasks > 0;
+    }
+
+    public void submit(Callable<T> task) {
+      this.service.submit(task);
+      ++numTasks;
+    }
+
+    public T next() {
+      if(!this.hasNext())
+        throw new NoSuchElementException();
+      try {
+        return service.take().get();
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      } finally {
+        --numTasks;
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    public Iterator<T> iterator() {
+      // use the shortcut here - this is only used in a privat context
+      return this;
+    }
+  }
 }

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java?rev=1057413&r1=1057412&r2=1057413&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java Mon Jan 10 23:16:40 2011
@@ -41,7 +41,11 @@ import org.apache.lucene.util.ThreadInte
  *
  * <p>Applications usually need only call the inherited {@link #search(Query,int)}
  * or {@link #search(Query,Filter,int)} methods.
+ * 
+ * @deprecated Please pass an ExecutorService to {@link
+ * IndexSearcher}, instead.
  */
+@Deprecated
 public class ParallelMultiSearcher extends MultiSearcher {
   private final ExecutorService executor;
   private final Searchable[] searchables;

Modified: lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java?rev=1057413&r1=1057412&r2=1057413&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java Mon Jan 10 23:16:40 2011
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
@@ -182,6 +185,8 @@ public class TestNRTThreads extends Luce
     // silly starting guess:
     final AtomicInteger totTermCount = new AtomicInteger(100);
 
+    final ExecutorService es = Executors.newCachedThreadPool();
+
     while(System.currentTimeMillis() < stopTime && !failed.get()) {
       if (random.nextBoolean()) {
         if (VERBOSE) {
@@ -219,7 +224,7 @@ public class TestNRTThreads extends Luce
 
       if (r.numDocs() > 0) {
 
-        final IndexSearcher s = new IndexSearcher(r);
+        final IndexSearcher s = new IndexSearcher(r, es);
 
         // run search threads
         final long searchStopTime = System.currentTimeMillis() + 500;
@@ -293,6 +298,9 @@ public class TestNRTThreads extends Luce
       }
     }
 
+    es.shutdown();
+    es.awaitTermination(1, TimeUnit.SECONDS);
+
     if (VERBOSE) {
       System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
     }
@@ -331,6 +339,7 @@ public class TestNRTThreads extends Luce
     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
       
     writer.close(false);
+    _TestUtil.checkIndex(dir);
     dir.close();
     _TestUtil.rmDir(tempDir);
     docs.close();