You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ja...@apache.org on 2023/05/26 07:17:33 UTC

[lucene] branch main updated: Parallelize knn query rewrite across slices rather than segments (#12325)

This is an automated email from the ASF dual-hosted git repository.

javanna pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/main by this push:
     new 10bebde2693 Parallelize knn query rewrite across slices rather than segments (#12325)
10bebde2693 is described below

commit 10bebde26936298c1909dd2ea0b5706b08d2face
Author: Luca Cavanna <ja...@apache.org>
AuthorDate: Fri May 26 09:17:25 2023 +0200

    Parallelize knn query rewrite across slices rather than segments (#12325)
    
    The concurrent query rewrite for knn vectory query introduced with #12160
    requests one thread per segment to the executor. To align this with the
    IndexSearcher parallel behaviour, we should rather parallelize across
    slices. Also, we can reuse the same slice executor instance that the
    index searcher already holds, in that way we are using a
    QueueSizeBasedExecutor when a thread pool executor is provided.
---
 .../lucene/search/AbstractKnnVectorQuery.java      | 58 ++++++++++++++--------
 .../org/apache/lucene/search/IndexSearcher.java    |  4 ++
 2 files changed, 40 insertions(+), 22 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
index 04309561cbd..cd8d73b8c26 100644
--- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
+++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
@@ -19,12 +19,12 @@ package org.apache.lucene.search;
 import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
 import org.apache.lucene.codecs.KnnVectorsReader;
 import org.apache.lucene.index.FieldInfo;
@@ -81,11 +81,12 @@ abstract class AbstractKnnVectorQuery extends Query {
       filterWeight = null;
     }
 
-    Executor executor = indexSearcher.getExecutor();
+    SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
+    // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
     TopDocs[] perLeafResults =
-        (executor == null)
+        (sliceExecutor == null)
             ? sequentialSearch(reader.leaves(), filterWeight)
-            : parallelSearch(reader.leaves(), filterWeight, executor);
+            : parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
 
     // Merge sort the results
     TopDocs topK = TopDocs.merge(k, perLeafResults);
@@ -109,27 +110,40 @@ abstract class AbstractKnnVectorQuery extends Query {
   }
 
   private TopDocs[] parallelSearch(
-      List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
-    List<FutureTask<TopDocs>> tasks =
-        leafReaderContexts.stream()
-            .map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
-            .toList();
+      IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {
+
+    List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
+    int segmentsCount = 0;
+    for (IndexSearcher.LeafSlice slice : slices) {
+      segmentsCount += slice.leaves.length;
+      tasks.add(
+          new FutureTask<>(
+              () -> {
+                TopDocs[] results = new TopDocs[slice.leaves.length];
+                int i = 0;
+                for (LeafReaderContext context : slice.leaves) {
+                  results[i++] = searchLeaf(context, filterWeight);
+                }
+                return results;
+              }));
+    }
 
-    SliceExecutor sliceExecutor = new SliceExecutor(executor);
     sliceExecutor.invokeAll(tasks);
 
-    return tasks.stream()
-        .map(
-            task -> {
-              try {
-                return task.get();
-              } catch (ExecutionException e) {
-                throw new RuntimeException(e.getCause());
-              } catch (InterruptedException e) {
-                throw new ThreadInterruptedException(e);
-              }
-            })
-        .toArray(TopDocs[]::new);
+    TopDocs[] topDocs = new TopDocs[segmentsCount];
+    int i = 0;
+    for (FutureTask<TopDocs[]> task : tasks) {
+      try {
+        for (TopDocs docs : task.get()) {
+          topDocs[i++] = docs;
+        }
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e.getCause());
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      }
+    }
+    return topDocs;
   }
 
   private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index 227d16dc721..fa7bb0e63c6 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -962,6 +962,10 @@ public class IndexSearcher {
     return executor;
   }
 
+  SliceExecutor getSliceExecutor() {
+    return sliceExecutor;
+  }
+
   /**
    * Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
    * typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to