You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ja...@apache.org on 2023/05/26 07:24:08 UTC

[lucene] branch branch_9x updated: Parallelize knn query rewrite across slices rather than segments (#12325)

This is an automated email from the ASF dual-hosted git repository.

javanna pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new e0ac15a6221 Parallelize knn query rewrite across slices rather than segments (#12325)
e0ac15a6221 is described below

commit e0ac15a622162504456ebbb28b2e3f2ab13c6670
Author: Luca Cavanna <ja...@apache.org>
AuthorDate: Fri May 26 09:17:25 2023 +0200

    Parallelize knn query rewrite across slices rather than segments (#12325)
    
    The concurrent query rewrite for knn vectory query introduced with #12160
    requests one thread per segment to the executor. To align this with the
    IndexSearcher parallel behaviour, we should rather parallelize across
    slices. Also, we can reuse the same slice executor instance that the
    index searcher already holds, in that way we are using a
    QueueSizeBasedExecutor when a thread pool executor is provided.
---
 lucene/CHANGES.txt                                 |  2 +
 .../lucene/search/AbstractKnnVectorQuery.java      | 59 +++++++++++++---------
 .../org/apache/lucene/search/IndexSearcher.java    |  4 ++
 3 files changed, 42 insertions(+), 23 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 89d645fa9bf..77cc97287bc 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -36,6 +36,8 @@ Improvements
 
 * GITHUB#12305: Minor cleanup and improvements to DaciukMihovAutomatonBuilder. (Greg Miller)
 
+* GITHUB#12325: Parallelize AbstractKnnVectorQuery rewrite across slices rather than segments. (Luca Cavanna)
+
 Optimizations
 ---------------------
 
diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
index d6b9e04b542..01c12e349e5 100644
--- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
+++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java
@@ -19,14 +19,13 @@ package org.apache.lucene.search;
 import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
-import java.util.stream.Collectors;
 import org.apache.lucene.codecs.KnnVectorsReader;
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.IndexReader;
@@ -82,11 +81,12 @@ abstract class AbstractKnnVectorQuery extends Query {
       filterWeight = null;
     }
 
-    Executor executor = indexSearcher.getExecutor();
+    SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
+    // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
     TopDocs[] perLeafResults =
-        (executor == null)
+        (sliceExecutor == null)
             ? sequentialSearch(reader.leaves(), filterWeight)
-            : parallelSearch(reader.leaves(), filterWeight, executor);
+            : parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
 
     // Merge sort the results
     TopDocs topK = TopDocs.merge(k, perLeafResults);
@@ -110,27 +110,40 @@ abstract class AbstractKnnVectorQuery extends Query {
   }
 
   private TopDocs[] parallelSearch(
-      List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
-    List<FutureTask<TopDocs>> tasks =
-        leafReaderContexts.stream()
-            .map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
-            .collect(Collectors.toList());
+      IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {
+
+    List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
+    int segmentsCount = 0;
+    for (IndexSearcher.LeafSlice slice : slices) {
+      segmentsCount += slice.leaves.length;
+      tasks.add(
+          new FutureTask<>(
+              () -> {
+                TopDocs[] results = new TopDocs[slice.leaves.length];
+                int i = 0;
+                for (LeafReaderContext context : slice.leaves) {
+                  results[i++] = searchLeaf(context, filterWeight);
+                }
+                return results;
+              }));
+    }
 
-    SliceExecutor sliceExecutor = new SliceExecutor(executor);
     sliceExecutor.invokeAll(tasks);
 
-    return tasks.stream()
-        .map(
-            task -> {
-              try {
-                return task.get();
-              } catch (ExecutionException e) {
-                throw new RuntimeException(e.getCause());
-              } catch (InterruptedException e) {
-                throw new ThreadInterruptedException(e);
-              }
-            })
-        .toArray(TopDocs[]::new);
+    TopDocs[] topDocs = new TopDocs[segmentsCount];
+    int i = 0;
+    for (FutureTask<TopDocs[]> task : tasks) {
+      try {
+        for (TopDocs docs : task.get()) {
+          topDocs[i++] = docs;
+        }
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e.getCause());
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      }
+    }
+    return topDocs;
   }
 
   private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index ce93bd9cef8..f167b716112 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -998,6 +998,10 @@ public class IndexSearcher {
     return executor;
   }
 
+  SliceExecutor getSliceExecutor() {
+    return sliceExecutor;
+  }
+
   /**
    * Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
    * typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to