You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2010/06/10 19:27:54 UTC
svn commit: r953407 - in /lucene/dev/trunk/lucene: CHANGES.txt
src/java/org/apache/lucene/search/ParallelMultiSearcher.java
Author: simonw
Date: Thu Jun 10 17:27:53 2010
New Revision: 953407
URL: http://svn.apache.org/viewvc?rev=953407&view=rev
Log:
LUCENE-2494: use CompletionService in ParallelMultiSearcher instead of simple polling
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=953407&r1=953406&r2=953407&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Thu Jun 10 17:27:53 2010
@@ -763,6 +763,8 @@ API Changes
(Robert Muir)
Optimizations
+ * LUCENE-2494: Use CompletionService in ParallelMultiSearcher instead of
+ simple polling for resutls. (Edward Drapkin, Simon Willnauer)
* LUCENE-2086: When resolving deleted terms, do so in term sort order
for better performance (Bogdan Ghidireac via Mike McCandless)
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java?rev=953407&r1=953406&r2=953407&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/search/ParallelMultiSearcher.java Thu Jun 10 17:27:53 2010
@@ -18,16 +18,17 @@ package org.apache.lucene.search;
*/
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -67,18 +68,20 @@ public class ParallelMultiSearcher exten
*/
@Override
public int docFreq(final Term term) throws IOException {
- @SuppressWarnings("unchecked") final Future<Integer>[] searchThreads = new Future[searchables.length];
- for (int i = 0; i < searchables.length; i++) { // search each searchable
+ final ExecutionHelper<Integer> runner = new ExecutionHelper<Integer>(executor);
+ for(int i = 0; i < searchables.length; i++) {
final Searchable searchable = searchables[i];
- searchThreads[i] = executor.submit(new Callable<Integer>() {
+ runner.submit(new Callable<Integer>() {
public Integer call() throws IOException {
return Integer.valueOf(searchable.docFreq(term));
}
});
}
- final CountDocFreq func = new CountDocFreq();
- foreach(func, Arrays.asList(searchThreads));
- return func.docFreq;
+ int docFreq = 0;
+ for (Integer num : runner) {
+ docFreq += num.intValue();
+ }
+ return docFreq;
}
/**
@@ -90,20 +93,25 @@ public class ParallelMultiSearcher exten
public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
final HitQueue hq = new HitQueue(nDocs, false);
final Lock lock = new ReentrantLock();
- @SuppressWarnings("unchecked") final Future<TopDocs>[] searchThreads = new Future[searchables.length];
+ final ExecutionHelper<TopDocs> runner = new ExecutionHelper<TopDocs>(executor);
+
for (int i = 0; i < searchables.length; i++) { // search each searchable
- searchThreads[i] = executor.submit(
+ runner.submit(
new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq, i, starts));
}
- final CountTotalHits<TopDocs> func = new CountTotalHits<TopDocs>();
- foreach(func, Arrays.asList(searchThreads));
+ int totalHits = 0;
+ float maxScore = Float.NEGATIVE_INFINITY;
+ for (final TopDocs topDocs : runner) {
+ totalHits += topDocs.totalHits;
+ maxScore = Math.max(maxScore, topDocs.getMaxScore());
+ }
final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
scoreDocs[i] = hq.pop();
- return new TopDocs(func.totalHits, scoreDocs, func.maxScore);
+ return new TopDocs(totalHits, scoreDocs, maxScore);
}
/**
@@ -117,20 +125,22 @@ public class ParallelMultiSearcher exten
final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs);
final Lock lock = new ReentrantLock();
- @SuppressWarnings("unchecked") final Future<TopFieldDocs>[] searchThreads = new Future[searchables.length];
+ final ExecutionHelper<TopFieldDocs> runner = new ExecutionHelper<TopFieldDocs>(executor);
for (int i = 0; i < searchables.length; i++) { // search each searchable
- searchThreads[i] = executor.submit(
+ runner.submit(
new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs, hq, sort, i, starts));
}
-
- final CountTotalHits<TopFieldDocs> func = new CountTotalHits<TopFieldDocs>();
- foreach(func, Arrays.asList(searchThreads));
-
+ int totalHits = 0;
+ float maxScore = Float.NEGATIVE_INFINITY;
+ for (final TopFieldDocs topFieldDocs : runner) {
+ totalHits += topFieldDocs.totalHits;
+ maxScore = Math.max(maxScore, topFieldDocs.getMaxScore());
+ }
final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
scoreDocs[i] = hq.pop();
- return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore);
+ return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
}
/** Lower-level search API.
@@ -192,13 +202,17 @@ public class ParallelMultiSearcher exten
HashMap<Term, Integer> createDocFrequencyMap(Set<Term> terms) throws IOException {
final Term[] allTermsArray = terms.toArray(new Term[terms.size()]);
final int[] aggregatedDocFreqs = new int[terms.size()];
- final ArrayList<Future<int[]>> searchThreads = new ArrayList<Future<int[]>>(searchables.length);
+ final ExecutionHelper<int[]> runner = new ExecutionHelper<int[]>(executor);
for (Searchable searchable : searchables) {
- final Future<int[]> future = executor.submit(
+ runner.submit(
new DocumentFrequencyCallable(searchable, allTermsArray));
- searchThreads.add(future);
}
- foreach(new AggregateDocFrequency(aggregatedDocFreqs), searchThreads);
+ final int docFreqLen = aggregatedDocFreqs.length;
+ for (final int[] docFreqs : runner) {
+ for(int i=0; i < docFreqLen; i++){
+ aggregatedDocFreqs[i] += docFreqs[i];
+ }
+ }
final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
for(int i=0; i<allTermsArray.length; i++) {
@@ -206,77 +220,7 @@ public class ParallelMultiSearcher exten
}
return dfMap;
}
-
- /*
- * apply the function to each element of the list. This method encapsulates the logic how
- * to wait for concurrently executed searchables.
- */
- private <T> void foreach(Function<T> func, List<Future<T>> list) throws IOException{
- for (Future<T> future : list) {
- try{
- func.apply(future.get());
- } catch (ExecutionException e) {
- final Throwable throwable = e.getCause();
- if (throwable instanceof IOException)
- throw (IOException) e.getCause();
- throw new RuntimeException(throwable);
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
- // Both functions could be reduced to Int as other values of TopDocs
- // are not needed. Using sep. functions is more self documenting.
- /**
- * A function with one argument
- * @param <T> the argument type
- */
- private static interface Function<T> {
- abstract void apply(T t);
- }
-
- /**
- * Counts the total number of hits for all {@link TopDocs} instances
- * provided.
- */
- private static final class CountTotalHits<T extends TopDocs> implements Function<T> {
- int totalHits = 0;
- float maxScore = Float.NEGATIVE_INFINITY;
-
- public void apply(T t) {
- totalHits += t.totalHits;
- maxScore = Math.max(maxScore, t.getMaxScore());
- }
- }
-
- /**
- * Accumulates the document frequency for a term.
- */
- private static final class CountDocFreq implements Function<Integer>{
- int docFreq = 0;
-
- public void apply(Integer t) {
- docFreq += t.intValue();
- }
- }
-
- /**
- * Aggregates the document frequencies from multiple searchers
- */
- private static final class AggregateDocFrequency implements Function<int[]>{
- final int[] aggregatedDocFreqs;
-
- public AggregateDocFrequency(int[] aggregatedDocFreqs){
- this.aggregatedDocFreqs = aggregatedDocFreqs;
- }
-
- public void apply(final int[] docFreqs) {
- for(int i=0; i<aggregatedDocFreqs.length; i++){
- aggregatedDocFreqs[i] += docFreqs[i];
- }
- }
- }
/**
* A {@link Callable} to retrieve the document frequencies for a Term array
@@ -294,4 +238,53 @@ public class ParallelMultiSearcher exten
return searchable.docFreqs(terms);
}
}
+
+ /**
+ * A helper class that wraps a {@link CompletionService} and provides an
+ * iterable interface to the completed {@link Callable} instances.
+ *
+ * @param <T>
+ * the type of the {@link Callable} return value
+ */
+ private static final class ExecutionHelper<T> implements Iterator<T>, Iterable<T> {
+ private final CompletionService<T> service;
+ private int numTasks;
+
+ ExecutionHelper(final Executor executor) {
+ this.service = new ExecutorCompletionService<T>(executor);
+ }
+
+ public boolean hasNext() {
+ return numTasks > 0;
+ }
+
+ public void submit(Callable<T> task) {
+ this.service.submit(task);
+ ++numTasks;
+ }
+
+ public T next() {
+ if(!this.hasNext())
+ throw new NoSuchElementException();
+ try {
+ return service.take().get();
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } finally {
+ --numTasks;
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<T> iterator() {
+ // use the shortcut here - this is only used in a privat context
+ return this;
+ }
+
+ }
}