You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2023/03/15 12:17:47 UTC

[lucene] branch main updated: Reduce contention in DocumentsWriterPerThreadPool. (#12199)

This is an automated email from the ASF dual-hosted git repository.

jpountz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/main by this push:
     new f3242040197 Reduce contention in DocumentsWriterPerThreadPool. (#12199)
f3242040197 is described below

commit f324204019738b893a2c200aa12b3a2e113867d1
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Wed Mar 15 13:17:40 2023 +0100

    Reduce contention in DocumentsWriterPerThreadPool. (#12199)
    
    Obtaining a DWPT and putting it back into the pool is subject to contention.
    This change reduces contention by using 8 sub pools that are tried sequentially.
    When applied on top of #12198, this reduces the time to index geonames with 20
    threads from ~19s to ~16-17s.
---
 lucene/CHANGES.txt                                 |   2 +-
 .../index/ConcurrentApproximatePriorityQueue.java  | 150 +++++++++++++++++++++
 .../lucene/index/DocumentsWriterPerThreadPool.java |  36 ++---
 .../TestConcurrentApproximatePriorityQueue.java    |  89 ++++++++++++
 4 files changed, 261 insertions(+), 16 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index bbadfc3332e..bf85baf25c6 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -165,7 +165,7 @@ Optimizations
 
 * GITHUB#12179: Better PostingsEnum reuse in MultiTermQueryConstantScoreBlendedWrapper. (Greg Miller)
 
-* GITHUB#12198: Reduced contention when indexing with many threads. (Adrien Grand)
+* GITHUB#12198, GITHUB#12199: Reduced contention when indexing with many threads. (Adrien Grand)
 
 Bug Fixes
 ---------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
new file mode 100644
index 00000000000..036ba24c962
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.index;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+/**
+ * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
+ * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
+ * independently.
+ */
+final class ConcurrentApproximatePriorityQueue<T> {
+
+  /** Keeping 8 queues should already help a lot compared to a single one. */
+  static final int CONCURRENCY = 8;
+
+  private static final int MASK = 0x07;
+
+  final Lock[] locks;
+  final ApproximatePriorityQueue<T>[] queues;
+
+  ConcurrentApproximatePriorityQueue() {
+    locks = new Lock[CONCURRENCY];
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
+    this.queues = queues;
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      locks[i] = new ReentrantLock();
+      queues[i] = new ApproximatePriorityQueue<>();
+    }
+  }
+
+  void add(T entry, long weight) {
+    // Seed the order in which to look at entries based on the current thread. This helps distribute
+    // entries across queues and gives a bit of thread affinity between entries and threads, which
+    // can't hurt.
+    final int threadHash = Thread.currentThread().hashCode();
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      final int index = (threadHash + i) & MASK;
+      final Lock lock = locks[index];
+      final ApproximatePriorityQueue<T> queue = queues[index];
+      if (lock.tryLock()) {
+        try {
+          queue.add(entry, weight);
+          return;
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+    final int index = threadHash & MASK;
+    final Lock lock = locks[index];
+    final ApproximatePriorityQueue<T> queue = queues[index];
+    lock.lock();
+    try {
+      queue.add(entry, weight);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  T poll(Predicate<T> predicate) {
+    final int threadHash = Thread.currentThread().hashCode();
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      final int index = (threadHash + i) & MASK;
+      final Lock lock = locks[index];
+      final ApproximatePriorityQueue<T> queue = queues[index];
+      if (lock.tryLock()) {
+        try {
+          T entry = queue.poll(predicate);
+          if (entry != null) {
+            return entry;
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      final int index = (threadHash + i) & MASK;
+      final Lock lock = locks[index];
+      final ApproximatePriorityQueue<T> queue = queues[index];
+      lock.lock();
+      try {
+        T entry = queue.poll(predicate);
+        if (entry != null) {
+          return entry;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  // Only used for assertions
+  boolean contains(Object o) {
+    boolean assertionsAreEnabled = false;
+    assert assertionsAreEnabled = true;
+    if (assertionsAreEnabled == false) {
+      throw new AssertionError("contains should only be used for assertions");
+    }
+
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      final Lock lock = locks[i];
+      final ApproximatePriorityQueue<T> queue = queues[i];
+      lock.lock();
+      try {
+        if (queue.contains(o)) {
+          return true;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  boolean remove(Object o) {
+    for (int i = 0; i < CONCURRENCY; ++i) {
+      final Lock lock = locks[i];
+      final ApproximatePriorityQueue<T> queue = queues[i];
+      lock.lock();
+      try {
+        if (queue.remove(o)) {
+          return true;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index f403f4c6ddc..df2d76b8879 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -44,11 +44,11 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
 
   private final Set<DocumentsWriterPerThread> dwpts =
       Collections.newSetFromMap(new IdentityHashMap<>());
-  private final ApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
-      new ApproximatePriorityQueue<>();
+  private final ConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
+      new ConcurrentApproximatePriorityQueue<>();
   private final Supplier<DocumentsWriterPerThread> dwptFactory;
   private int takenWriterPermits = 0;
-  private boolean closed;
+  private volatile boolean closed;
 
   DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) {
     this.dwptFactory = dwptFactory;
@@ -113,15 +113,16 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
    * operation (add/updateDocument).
    */
   DocumentsWriterPerThread getAndLock() {
-    synchronized (this) {
-      ensureOpen();
-      DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
-      if (dwpt == null) {
-        dwpt = newWriter();
-      }
-      // DWPT is already locked before return by this method:
+    ensureOpen();
+    DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
+    if (dwpt != null) {
       return dwpt;
     }
+    // newWriter() adds the DWPT to the `dwpts` set as a side-effect. However it is not added to
+    // `freeList` at this point, it will be added later on once DocumentsWriter has indexed a
+    // document into this DWPT and then gives it back to the pool by calling
+    // #marksAsFreeAndUnlock.
+    return newWriter();
   }
 
   private void ensureOpen() {
@@ -130,13 +131,15 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
     }
   }
 
+  private synchronized boolean contains(DocumentsWriterPerThread state) {
+    return dwpts.contains(state);
+  }
+
   void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
     final long ramBytesUsed = state.ramBytesUsed();
-    synchronized (this) {
-      assert dwpts.contains(state)
-          : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
-      freeList.add(state, ramBytesUsed);
-    }
+    assert contains(state)
+        : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
+    freeList.add(state, ramBytesUsed);
     state.unlock();
   }
 
@@ -175,6 +178,9 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
    * @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
    */
   synchronized boolean checkout(DocumentsWriterPerThread perThread) {
+    // The DWPT must be held by the current thread. This guarantees that concurrent calls to
+    // #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to
+    // check if the DWPT is available.
     assert perThread.isHeldByCurrentThread();
     if (dwpts.remove(perThread)) {
       freeList.remove(perThread);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
new file mode 100644
index 00000000000..49584d9e697
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.index;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
+
+  public void testPollFromSameThread() {
+    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    pq.add(3, 3);
+    pq.add(10, 10);
+    pq.add(7, 7);
+    assertEquals(Integer.valueOf(10), pq.poll(x -> true));
+    assertEquals(Integer.valueOf(7), pq.poll(x -> true));
+    assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+    assertNull(pq.poll(x -> true));
+  }
+
+  public void testPollFromDifferentThread() throws Exception {
+    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    pq.add(3, 3);
+    pq.add(10, 10);
+    pq.add(7, 7);
+    Thread t =
+        new Thread() {
+          @Override
+          public void run() {
+            assertEquals(Integer.valueOf(10), pq.poll(x -> true));
+            assertEquals(Integer.valueOf(7), pq.poll(x -> true));
+            assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+            assertNull(pq.poll(x -> true));
+          }
+        };
+    t.start();
+    t.join();
+  }
+
+  public void testCurrentLockIsBusy() throws Exception {
+    ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+    pq.add(3, 3);
+    CountDownLatch takeLock = new CountDownLatch(1);
+    CountDownLatch releaseLock = new CountDownLatch(1);
+    Thread t =
+        new Thread() {
+          @Override
+          public void run() {
+            int queueIndex = -1;
+            for (int i = 0; i < pq.queues.length; ++i) {
+              if (pq.queues[i].isEmpty() == false) {
+                queueIndex = i;
+                break;
+              }
+            }
+            assertTrue(pq.locks[queueIndex].tryLock());
+            takeLock.countDown();
+            try {
+              releaseLock.await();
+            } catch (InterruptedException e) {
+              throw new ThreadInterruptedException(e);
+            }
+            pq.locks[queueIndex].unlock();
+          }
+        };
+    t.start();
+    takeLock.await();
+    pq.add(1, 1); // The lock is taken so this needs to go to a different queue
+    assertEquals(Integer.valueOf(1), pq.poll(x -> true));
+    releaseLock.countDown();
+    assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+    assertNull(pq.poll(x -> true));
+  }
+}