You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2023/03/31 13:07:56 UTC

[lucene] branch main updated: Adjust DWPT pool concurrency to the number of cores. (#12216)

This is an automated email from the ASF dual-hosted git repository.

jpountz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/main by this push:
     new 56e65919b18 Adjust DWPT pool concurrency to the number of cores. (#12216)
56e65919b18 is described below

commit 56e65919b18dccb368d080cf9188080727bd63f1
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Fri Mar 31 15:07:48 2023 +0200

    Adjust DWPT pool concurrency to the number of cores. (#12216)
    
    After upgrading Elasticsearch to a recent Lucene snapshot, we observed a few
    indexing slowdowns when indexing with low numbers of cores. This appears to be
    due to the fact that we lost too much of the bias towards larger DWPTs in
    apache/lucene#12199. This change tries to add back more ordering by adjusting
    the concurrency of `DWPTPool` to the number of cores that are available on the
    local node.
---
 .../index/ConcurrentApproximatePriorityQueue.java  | 62 +++++++++++++++-------
 .../TestConcurrentApproximatePriorityQueue.java    | 20 +++++--
 2 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
index 036ba24c962..8a8fc72ab4c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
@@ -22,25 +22,49 @@ import java.util.function.Predicate;
 
 /**
  * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
- * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
- * independently.
+ * better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked
+ * independently. The number of subs is computed dynamically based on hardware concurrency.
  */
 final class ConcurrentApproximatePriorityQueue<T> {
 
-  /** Keeping 8 queues should already help a lot compared to a single one. */
-  static final int CONCURRENCY = 8;
+  static final int MIN_CONCURRENCY = 1;
+  static final int MAX_CONCURRENCY = 256;
 
-  private static final int MASK = 0x07;
+  private static final int getConcurrency() {
+    int coreCount = Runtime.getRuntime().availableProcessors();
+    // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is
+    // that if we set the concurrency too high then we'll completely lose the bias towards larger
+    // DWPTs. And if we set it too low then we risk seeing contention.
+    int concurrency = coreCount / 4;
+    concurrency = Math.max(MIN_CONCURRENCY, concurrency);
+    concurrency = Math.min(MAX_CONCURRENCY, concurrency);
+    return concurrency;
+  }
 
+  final int concurrency;
   final Lock[] locks;
   final ApproximatePriorityQueue<T>[] queues;
 
   ConcurrentApproximatePriorityQueue() {
-    locks = new Lock[CONCURRENCY];
+    this(getConcurrency());
+  }
+
+  ConcurrentApproximatePriorityQueue(int concurrency) {
+    if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
+      throw new IllegalArgumentException(
+          "concurrency must be in ["
+              + MIN_CONCURRENCY
+              + ", "
+              + MAX_CONCURRENCY
+              + "], got "
+              + concurrency);
+    }
+    this.concurrency = concurrency;
+    locks = new Lock[concurrency];
     @SuppressWarnings({"rawtypes", "unchecked"})
-    ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
+    ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[concurrency];
     this.queues = queues;
-    for (int i = 0; i < CONCURRENCY; ++i) {
+    for (int i = 0; i < concurrency; ++i) {
       locks[i] = new ReentrantLock();
       queues[i] = new ApproximatePriorityQueue<>();
     }
@@ -50,9 +74,9 @@ final class ConcurrentApproximatePriorityQueue<T> {
     // Seed the order in which to look at entries based on the current thread. This helps distribute
     // entries across queues and gives a bit of thread affinity between entries and threads, which
     // can't hurt.
-    final int threadHash = Thread.currentThread().hashCode();
-    for (int i = 0; i < CONCURRENCY; ++i) {
-      final int index = (threadHash + i) & MASK;
+    final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
+    for (int i = 0; i < concurrency; ++i) {
+      final int index = (threadHash + i) % concurrency;
       final Lock lock = locks[index];
       final ApproximatePriorityQueue<T> queue = queues[index];
       if (lock.tryLock()) {
@@ -64,7 +88,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
         }
       }
     }
-    final int index = threadHash & MASK;
+    final int index = threadHash % concurrency;
     final Lock lock = locks[index];
     final ApproximatePriorityQueue<T> queue = queues[index];
     lock.lock();
@@ -76,9 +100,9 @@ final class ConcurrentApproximatePriorityQueue<T> {
   }
 
   T poll(Predicate<T> predicate) {
-    final int threadHash = Thread.currentThread().hashCode();
-    for (int i = 0; i < CONCURRENCY; ++i) {
-      final int index = (threadHash + i) & MASK;
+    final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
+    for (int i = 0; i < concurrency; ++i) {
+      final int index = (threadHash + i) % concurrency;
       final Lock lock = locks[index];
       final ApproximatePriorityQueue<T> queue = queues[index];
       if (lock.tryLock()) {
@@ -92,8 +116,8 @@ final class ConcurrentApproximatePriorityQueue<T> {
         }
       }
     }
-    for (int i = 0; i < CONCURRENCY; ++i) {
-      final int index = (threadHash + i) & MASK;
+    for (int i = 0; i < concurrency; ++i) {
+      final int index = (threadHash + i) % concurrency;
       final Lock lock = locks[index];
       final ApproximatePriorityQueue<T> queue = queues[index];
       lock.lock();
@@ -117,7 +141,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
       throw new AssertionError("contains should only be used for assertions");
     }
 
-    for (int i = 0; i < CONCURRENCY; ++i) {
+    for (int i = 0; i < concurrency; ++i) {
       final Lock lock = locks[i];
       final ApproximatePriorityQueue<T> queue = queues[i];
       lock.lock();
@@ -133,7 +157,7 @@ final class ConcurrentApproximatePriorityQueue<T> {
   }
 
   boolean remove(Object o) {
-    for (int i = 0; i < CONCURRENCY; ++i) {
+    for (int i = 0; i < concurrency; ++i) {
       final Lock lock = locks[i];
       final ApproximatePriorityQueue<T> queue = queues[i];
       lock.lock();
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
index 49584d9e697..2656e4a3885 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
@@ -18,12 +18,18 @@ package org.apache.lucene.index;
 
 import java.util.concurrent.CountDownLatch;
 import org.apache.lucene.tests.util.LuceneTestCase;
+import org.apache.lucene.tests.util.TestUtil;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
 
   public void testPollFromSameThread() {
-    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    ConcurrentApproximatePriorityQueue<Integer> pq =
+        new ConcurrentApproximatePriorityQueue<>(
+            TestUtil.nextInt(
+                random(),
+                ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
+                ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
     pq.add(3, 3);
     pq.add(10, 10);
     pq.add(7, 7);
@@ -34,7 +40,12 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
   }
 
   public void testPollFromDifferentThread() throws Exception {
-    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    ConcurrentApproximatePriorityQueue<Integer> pq =
+        new ConcurrentApproximatePriorityQueue<>(
+            TestUtil.nextInt(
+                random(),
+                ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
+                ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
     pq.add(3, 3);
     pq.add(10, 10);
     pq.add(7, 7);
@@ -53,7 +64,10 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
   }
 
   public void testCurrentLockIsBusy() throws Exception {
-    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    // This test needs a concurrency of 2 or more.
+    ConcurrentApproximatePriorityQueue<Integer> pq =
+        new ConcurrentApproximatePriorityQueue<>(
+            TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
     pq.add(3, 3);
     CountDownLatch takeLock = new CountDownLatch(1);
     CountDownLatch releaseLock = new CountDownLatch(1);