You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by "quux00 (via GitHub)" <gi...@apache.org> on 2023/08/28 20:56:45 UTC

[GitHub] [lucene] quux00 opened a new pull request, #12523: TaskExecutor waits for all tasks to complete before returning

quux00 opened a new pull request, #12523:
URL: https://github.com/apache/lucene/pull/12523

   The TaskExecutor used by IndexSearcher and AbstractKnnVectorQuery
   waits until all tasks have finished before returning, even when
   one or more of the tasks throw an Exception.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343749320


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }

Review Comment:
   I am still not getting what has changed that makes the additional if required in this test. I thought that we offload to the executor even with a single segment, hence `assertEquals(leaves.size(), numExecutions.get());` should be comprehensive, even for when there are no segments. Have you run multiple iterations with this change? Was the test failing for you that made you change it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343180733


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);

Review Comment:
   Got it. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on PR #12523:
URL: https://github.com/apache/lucene/pull/12523#issuecomment-1748362692

   Thanks @quux00 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343185358


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      CountDownLatch latch = new CountDownLatch(numExceptions);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer, latch);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+    private final CountDownLatch latch;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(
+        int numExceptions, AtomicInteger callsToScorer, CountDownLatch latch) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+      this.latch = latch;
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            try {
+              throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+            } finally {
+              latch.countDown();
+            }
+          } else {
+            try {
+              latch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna merged PR #12523:
URL: https://github.com/apache/lucene/pull/12523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene] quux00 commented on a diff in pull request #12523: TaskExecutor waits for all tasks to complete before returning

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1340643119


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   I removed the sleep and added a CountDownLatch that acts to wait until all exceptions have been thrown before the `scorer` methods that are NOT throwing Exceptions have to wait before they increment the `callsToScorer` counter.
   
   With the original implementation of `TaskExecutor#invokeAll` this test fails about 80% of the time on my machine (I ran it about 100 times), whereas it always passes with the new impl of `invokeAll`. I don't see how to make it fully deterministically fail with the old `invokeAll` since we need the Exception to always get thrown by the "Exception case" scorers _before_ the non-Exception-case scorers finish.  You could put in a latch that the test waits upon that gates the non-Exception-case scorers (don't finish until the test has caught the Exception from invokeAll) - that would deterministically fail with the old version of `invokeAll`, but would never pass with the new code (it would hang indefinitely).
   
   Let me know if you see a clean way to make it deterministic with both impls of `invokeAll`.



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -50,16 +51,21 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) {
     for (Runnable task : tasks) {
       executor.execute(task);
     }
+
+    RuntimeException exc = null;
     final List<T> results = new ArrayList<>();
     for (Future<T> future : tasks) {
       try {
         results.add(future.get());
       } catch (InterruptedException e) {
-        throw new ThreadInterruptedException(e);
+        exc = new ThreadInterruptedException(e);
       } catch (ExecutionException e) {
-        throw new RuntimeException(e.getCause());
+        exc = new RuntimeException(e.getCause());

Review Comment:
   I added this change and the tests are passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343870579


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +265,132 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * The goal of this test is to ensure that TaskExecutor.invokeAll waits for all tasks/callables to
+   * finish even if one or more of them throw an Exception.
+   *
+   * <p>To make the test deterministic, a custom single threaded executor is used. And to ensure
+   * that TaskExecutor.invokeAll does not return early upon getting an Exception, two Exceptions are
+   * thrown in the underlying Query class (in the Weight#scorer method). The first Exception is
+   * thrown by the first call to Weight#scorer and the last Exception is thrown by the last call to
+   * Weight#scorer. Since TaskExecutor.invokeAll adds subsequent Exceptions to the first one caught
+   * as a suppressed Exception, we can check that both exceptions were thrown, ensuring that all
+   * TaskExecutor#invokeAll check all tasks (using future.get()) before it returned.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    IndexSearcher searcher =
+        new IndexSearcher(
+            reader,
+            task -> {
+              task.run();

Review Comment:
   I think that I mislead you with this suggestion, sorry. I would rather use a single threaded executor instead. That is rather slow as it executes one task at a time, but it's executing on a separate thread hence the `execute` call does not block which is more realistic, otherwise you indeed implicitly wait for all tasks to be completed because invokeAll first submits them and then waits for them all to complete.
   
   We have control over the future (because we create a FutureTask in TaskExecutor), but it sounds artificial to e.g. override `get` to ensure it is called. I would like to rather have a higher level test that verifies that no running tasks are left behind once the overall top-level operation returns.
   
   I have been thinking that it probably makes sense to move this test to TestTaskExecutor and make it less generic. `invokeAll` takes now a collection of callables and perhaps you can test that independently of docs and segments created. Possibly increasing the number of tasks is going to make the test more repeatable. I think we can live with the probabilistic nature of this test though.
   
   The following:
   
   ```
     public void testInvokeAllDoesNotLeaveTasksBehind() {
         TaskExecutor taskExecutor = new TaskExecutor(executorService);
         AtomicInteger tasks = new AtomicInteger(0);
         List<Callable<Void>> callables = new ArrayList<>();
         callables.add(() -> {
             throw new RuntimeException();
         });
         for (int i = 0; i < 99; i++) {
             callables.add(() -> {
                 tasks.incrementAndGet();
                 return null;
             });
         }
         expectThrows(RuntimeException.class, () -> taskExecutor.invokeAll(callables));
         assertEquals(99, tasks.get());
     }
   ```
   
   fails without the wait 903 times out of 1000 on my machine. It is also much simpler and does not require a custom query either.
   
   I would still consider testing suppressed exceptions and wait in two different methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343048682


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }

Review Comment:
   oh well, I guess I am wrong because tests succeed :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343142475


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   Using a single threaded executor would make the test more repeatable, but it also passes with the original implementation of `TaskExecutor#invokeAll`, so it doesn't really test the core change of the ticket.
   
   It passes with the existing `invokeAll` functionality because that method calls the `task.run()` on all tasks (Callables now) _before_ moving to the second part of the method where it cycles through the Futures and calls `get`. The key aspect of the test is that it needs to ensure that all threads have finished before `invokeAll` returns in a truly concurrent multi-threaded scenario.
   
   What we really want is a way to check that all `future.get` methods have been called in invokeAll before it returns, but I couldn't find a way to do that.
   
   So you either need a probabilistic test like I currently have or we would need some trick to sort of test what we want with a single threaded executor, such as:
   
   always throw at least two exceptions (except for tests where only one task is created by random chance) - and make sure that one of those exceptions is thrown by the last task to be processed. Then you would prove that the invokeAll method is waiting for all tasks to finish even when exceptions are thrown by "intermediate" tasks. I can try implementing that model, but it would require some documentation for maintainers to see what needs to be done to ensure the test is actually testing the key feature of the ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1344264569


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343266881


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   I pushed two new commits - one that address all feedback except this one. The final one then changes the test to use a single threaded executor and throwing two exceptions - at the beginning and end of the sequential flow of tasks. We can decide which test we like better and revert the last commit if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on PR #12523:
URL: https://github.com/apache/lucene/pull/12523#issuecomment-1745484064

   This looks great thanks @quux00 ! Could you add an entry to the lucene/CHANGES.txt file under Lucene 9.9 please? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1344356526


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +265,132 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * The goal of this test is to ensure that TaskExecutor.invokeAll waits for all tasks/callables to
+   * finish even if one or more of them throw an Exception.
+   *
+   * <p>To make the test deterministic, a custom single threaded executor is used. And to ensure
+   * that TaskExecutor.invokeAll does not return early upon getting an Exception, two Exceptions are
+   * thrown in the underlying Query class (in the Weight#scorer method). The first Exception is
+   * thrown by the first call to Weight#scorer and the last Exception is thrown by the last call to
+   * Weight#scorer. Since TaskExecutor.invokeAll adds subsequent Exceptions to the first one caught
+   * as a suppressed Exception, we can check that both exceptions were thrown, ensuring that all
+   * TaskExecutor#invokeAll check all tasks (using future.get()) before it returned.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    IndexSearcher searcher =
+        new IndexSearcher(
+            reader,
+            task -> {
+              task.run();

Review Comment:
   Good idea. That is cleaner to do in the `TestTaskExcecutor`. I have added two tests there, one testing leaving no tasks behind (similar to yours) and another that throws two exceptions and ensures that the second is added as a suppressed exception.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +265,132 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * The goal of this test is to ensure that TaskExecutor.invokeAll waits for all tasks/callables to
+   * finish even if one or more of them throw an Exception.
+   *
+   * <p>To make the test deterministic, a custom single threaded executor is used. And to ensure
+   * that TaskExecutor.invokeAll does not return early upon getting an Exception, two Exceptions are
+   * thrown in the underlying Query class (in the Weight#scorer method). The first Exception is
+   * thrown by the first call to Weight#scorer and the last Exception is thrown by the last call to
+   * Weight#scorer. Since TaskExecutor.invokeAll adds subsequent Exceptions to the first one caught
+   * as a suppressed Exception, we can check that both exceptions were thrown, ensuring that all
+   * TaskExecutor#invokeAll check all tasks (using future.get()) before it returned.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    IndexSearcher searcher =
+        new IndexSearcher(
+            reader,
+            task -> {
+              task.run();

Review Comment:
   Good idea. That is cleaner to do it in the `TestTaskExcecutor`. I have added two tests there, one testing leaving no tasks behind (similar to yours) and another that throws two exceptions and ensures that the second is added as a suppressed exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "quux00 (via GitHub)" <gi...@apache.org>.
quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343180948


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;

Review Comment:
   Fixed. I'll put a max of 8 threads.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      CountDownLatch latch = new CountDownLatch(numExceptions);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer, latch);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343009944


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);

Review Comment:
   it would also be fine to call the existing slices method providing 1,1 as the threshold arguments. That should achieve the same end-goal.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }

Review Comment:
   I think that this is incorrect, maybe due to a bad merge? That's because we now unconditionally offload to the executor.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;

Review Comment:
   how many leaves max may we have ? I am wondering if we may end up creating too many threads. Would it hurt the test to lower the number of threads?



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      CountDownLatch latch = new CountDownLatch(numExceptions);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer, latch);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));

Review Comment:
   would it make sense to also check the suppressed exception? That could also be a separate test if it's easier, without randomized behaviour. Or you could pre-define the order in which exceptions may be thrown, which is easy to preserve as you already have an atomic counter. That should make checking the suppressed exception relatively easy.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }
+  }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      CountDownLatch latch = new CountDownLatch(numExceptions);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer, latch);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+    private final CountDownLatch latch;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(
+        int numExceptions, AtomicInteger callsToScorer, CountDownLatch latch) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+      this.latch = latch;
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            try {
+              throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+            } finally {
+              latch.countDown();
+            }
+          } else {
+            try {
+              latch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   maybe add a comment here to explain that the slices that don't throw will wait for all the exceptions to be thrown before proceeding, in order to make the test more deterministic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343038417


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   > You could put in a latch that the test waits upon that gates the non-Exception-case scorers (don't finish until the test has caught the Exception from invokeAll) - that would deterministically fail with the old version of invokeAll, but would never pass with the new code (it would hang indefinitely).
   
   I am not even sure that that would work, because in order to get the exception, the new code needs to wait for all slices, and if the slices are waiting for us to get the exception, that sounds like a deadlock?
   
   I am thinking that the current test is good enough. It would fail quick enough for us to notice if we stopped waiting for all slices to be completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343042425


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   Thinking more of this, perhaps using a single threaded executor, or even one that executes on the caller thread would make the test more repeatable? Do we need to leverage parallel execution to test the behaviour we are introducing? It seems like with a single thread we could even simplify the test and remove the latch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


Re: [PR] TaskExecutor waits for all tasks to complete before returning [lucene]

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1343776898


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,7 +266,133 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
-    assertEquals(leaves.size(), numExecutions.get());
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size(), numExecutions.get());
+    }

Review Comment:
   I confirmed locally that the test fails in some cases with your change, but does not fail with the previous assertion. Can you revert this bit please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org


[GitHub] [lucene] javanna commented on a diff in pull request #12523: TaskExecutor waits for all tasks to complete before returning

Posted by "javanna (via GitHub)" <gi...@apache.org>.
javanna commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1315463865


##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -50,16 +51,21 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) {
     for (Runnable task : tasks) {
       executor.execute(task);
     }
+
+    RuntimeException exc = null;
     final List<T> results = new ArrayList<>();
     for (Future<T> future : tasks) {
       try {
         results.add(future.get());
       } catch (InterruptedException e) {
-        throw new ThreadInterruptedException(e);
+        exc = new ThreadInterruptedException(e);
       } catch (ExecutionException e) {
-        throw new RuntimeException(e.getCause());
+        exc = new RuntimeException(e.getCause());

Review Comment:
   I wonder if we should add the previous exception as suppressed exception or something along those lines, rather than throwing only the last ones and not letting users know about the others. This can totally be a follow-up change though.



##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter allows
+            // the task with the Exception to be thrown and if TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the callsToScorer
+            // counter will not match the total number of tasks (or rather usually will
+            // not match, since there is a race condition that makes it probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   Can we find another way to test this and remove the need for the sleep in this test? Perhaps we could use a count down latch and count down on each scorer, to make sure that they are all called, and instead of the assertion on the number of leaves, simply wait on the latch to be counted down?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org