You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2023/03/15 12:17:47 UTC
[lucene] branch main updated: Reduce contention in DocumentsWriterPerThreadPool. (#12199)
This is an automated email from the ASF dual-hosted git repository.
jpountz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git
The following commit(s) were added to refs/heads/main by this push:
new f3242040197 Reduce contention in DocumentsWriterPerThreadPool. (#12199)
f3242040197 is described below
commit f324204019738b893a2c200aa12b3a2e113867d1
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Wed Mar 15 13:17:40 2023 +0100
Reduce contention in DocumentsWriterPerThreadPool. (#12199)
Obtaining a DWPT and putting it back into the pool is subject to contention.
This change reduces contention by using 8 sub pools that are tried sequentially.
When applied on top of #12198, this reduces the time to index geonames with 20
threads from ~19s to ~16-17s.
---
lucene/CHANGES.txt | 2 +-
.../index/ConcurrentApproximatePriorityQueue.java | 150 +++++++++++++++++++++
.../lucene/index/DocumentsWriterPerThreadPool.java | 36 ++---
.../TestConcurrentApproximatePriorityQueue.java | 89 ++++++++++++
4 files changed, 261 insertions(+), 16 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index bbadfc3332e..bf85baf25c6 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -165,7 +165,7 @@ Optimizations
* GITHUB#12179: Better PostingsEnum reuse in MultiTermQueryConstantScoreBlendedWrapper. (Greg Miller)
-* GITHUB#12198: Reduced contention when indexing with many threads. (Adrien Grand)
+* GITHUB#12198, GITHUB#12199: Reduced contention when indexing with many threads. (Adrien Grand)
Bug Fixes
---------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
new file mode 100644
index 00000000000..036ba24c962
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.index;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+/**
+ * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
+ * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
+ * independently.
+ */
+final class ConcurrentApproximatePriorityQueue<T> {
+
+ /** Keeping 8 queues should already help a lot compared to a single one. */
+ static final int CONCURRENCY = 8;
+
+ private static final int MASK = 0x07;
+
+ final Lock[] locks;
+ final ApproximatePriorityQueue<T>[] queues;
+
+ ConcurrentApproximatePriorityQueue() {
+ locks = new Lock[CONCURRENCY];
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
+ this.queues = queues;
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ locks[i] = new ReentrantLock();
+ queues[i] = new ApproximatePriorityQueue<>();
+ }
+ }
+
+ void add(T entry, long weight) {
+ // Seed the order in which to look at entries based on the current thread. This helps distribute
+ // entries across queues and gives a bit of thread affinity between entries and threads, which
+ // can't hurt.
+ final int threadHash = Thread.currentThread().hashCode();
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ final int index = (threadHash + i) & MASK;
+ final Lock lock = locks[index];
+ final ApproximatePriorityQueue<T> queue = queues[index];
+ if (lock.tryLock()) {
+ try {
+ queue.add(entry, weight);
+ return;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ final int index = threadHash & MASK;
+ final Lock lock = locks[index];
+ final ApproximatePriorityQueue<T> queue = queues[index];
+ lock.lock();
+ try {
+ queue.add(entry, weight);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ T poll(Predicate<T> predicate) {
+ final int threadHash = Thread.currentThread().hashCode();
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ final int index = (threadHash + i) & MASK;
+ final Lock lock = locks[index];
+ final ApproximatePriorityQueue<T> queue = queues[index];
+ if (lock.tryLock()) {
+ try {
+ T entry = queue.poll(predicate);
+ if (entry != null) {
+ return entry;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ final int index = (threadHash + i) & MASK;
+ final Lock lock = locks[index];
+ final ApproximatePriorityQueue<T> queue = queues[index];
+ lock.lock();
+ try {
+ T entry = queue.poll(predicate);
+ if (entry != null) {
+ return entry;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return null;
+ }
+
+ // Only used for assertions
+ boolean contains(Object o) {
+ boolean assertionsAreEnabled = false;
+ assert assertionsAreEnabled = true;
+ if (assertionsAreEnabled == false) {
+ throw new AssertionError("contains should only be used for assertions");
+ }
+
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ final Lock lock = locks[i];
+ final ApproximatePriorityQueue<T> queue = queues[i];
+ lock.lock();
+ try {
+ if (queue.contains(o)) {
+ return true;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ boolean remove(Object o) {
+ for (int i = 0; i < CONCURRENCY; ++i) {
+ final Lock lock = locks[i];
+ final ApproximatePriorityQueue<T> queue = queues[i];
+ lock.lock();
+ try {
+ if (queue.remove(o)) {
+ return true;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index f403f4c6ddc..df2d76b8879 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -44,11 +44,11 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
private final Set<DocumentsWriterPerThread> dwpts =
Collections.newSetFromMap(new IdentityHashMap<>());
- private final ApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
- new ApproximatePriorityQueue<>();
+ private final ConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
+ new ConcurrentApproximatePriorityQueue<>();
private final Supplier<DocumentsWriterPerThread> dwptFactory;
private int takenWriterPermits = 0;
- private boolean closed;
+ private volatile boolean closed;
DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) {
this.dwptFactory = dwptFactory;
@@ -113,15 +113,16 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
* operation (add/updateDocument).
*/
DocumentsWriterPerThread getAndLock() {
- synchronized (this) {
- ensureOpen();
- DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
- if (dwpt == null) {
- dwpt = newWriter();
- }
- // DWPT is already locked before return by this method:
+ ensureOpen();
+ DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
+ if (dwpt != null) {
return dwpt;
}
+ // newWriter() adds the DWPT to the `dwpts` set as a side-effect. However it is not added to
+ // `freeList` at this point, it will be added later on once DocumentsWriter has indexed a
+ // document into this DWPT and then gives it back to the pool by calling
+ // #marksAsFreeAndUnlock.
+ return newWriter();
}
private void ensureOpen() {
@@ -130,13 +131,15 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
}
}
+ private synchronized boolean contains(DocumentsWriterPerThread state) {
+ return dwpts.contains(state);
+ }
+
void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
final long ramBytesUsed = state.ramBytesUsed();
- synchronized (this) {
- assert dwpts.contains(state)
- : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
- freeList.add(state, ramBytesUsed);
- }
+ assert contains(state)
+ : "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
+ freeList.add(state, ramBytesUsed);
state.unlock();
}
@@ -175,6 +178,9 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
* @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
*/
synchronized boolean checkout(DocumentsWriterPerThread perThread) {
+ // The DWPT must be held by the current thread. This guarantees that concurrent calls to
+ // #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to
+ // check if the DWPT is available.
assert perThread.isHeldByCurrentThread();
if (dwpts.remove(perThread)) {
freeList.remove(perThread);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
new file mode 100644
index 00000000000..49584d9e697
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.index;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.lucene.tests.util.LuceneTestCase;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {
+
+ public void testPollFromSameThread() {
+ ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+ pq.add(3, 3);
+ pq.add(10, 10);
+ pq.add(7, 7);
+ assertEquals(Integer.valueOf(10), pq.poll(x -> true));
+ assertEquals(Integer.valueOf(7), pq.poll(x -> true));
+ assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+ assertNull(pq.poll(x -> true));
+ }
+
+ public void testPollFromDifferentThread() throws Exception {
+ ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+ pq.add(3, 3);
+ pq.add(10, 10);
+ pq.add(7, 7);
+ Thread t =
+ new Thread() {
+ @Override
+ public void run() {
+ assertEquals(Integer.valueOf(10), pq.poll(x -> true));
+ assertEquals(Integer.valueOf(7), pq.poll(x -> true));
+ assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+ assertNull(pq.poll(x -> true));
+ }
+ };
+ t.start();
+ t.join();
+ }
+
+ public void testCurrentLockIsBusy() throws Exception {
+ ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
+ pq.add(3, 3);
+ CountDownLatch takeLock = new CountDownLatch(1);
+ CountDownLatch releaseLock = new CountDownLatch(1);
+ Thread t =
+ new Thread() {
+ @Override
+ public void run() {
+ int queueIndex = -1;
+ for (int i = 0; i < pq.queues.length; ++i) {
+ if (pq.queues[i].isEmpty() == false) {
+ queueIndex = i;
+ break;
+ }
+ }
+ assertTrue(pq.locks[queueIndex].tryLock());
+ takeLock.countDown();
+ try {
+ releaseLock.await();
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ }
+ pq.locks[queueIndex].unlock();
+ }
+ };
+ t.start();
+ takeLock.await();
+ pq.add(1, 1); // The lock is taken so this needs to go to a different queue
+ assertEquals(Integer.valueOf(1), pq.poll(x -> true));
+ releaseLock.countDown();
+ assertEquals(Integer.valueOf(3), pq.poll(x -> true));
+ assertNull(pq.poll(x -> true));
+ }
+}