You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2019/01/16 15:43:09 UTC
[lucene-solr] branch branch_7x updated: LUCENE-8639: Prevent new
threadstates from being created while we cut over to a new delete queue
(#535)
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch branch_7x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_7x by this push:
new 0d306b4 LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)
0d306b4 is described below
commit 0d306b40b77ba85d1602c60621e93c5b190b6ef9
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jan 16 16:37:49 2019 +0100
LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)
This prevents an edge case where suddenly a lot of threads start indexing
while we carry over sequence ids from the previous to the new delete queue.
We now lock creation of new thread states for a very short time until we created and assigned
a new delete queue.
---
lucene/CHANGES.txt | 3 ++
.../org/apache/lucene/index/DocumentsWriter.java | 4 +--
.../lucene/index/DocumentsWriterFlushControl.java | 21 +++++++------
.../lucene/index/DocumentsWriterPerThreadPool.java | 30 ++++++++++++-------
.../apache/lucene/index/FrozenBufferedUpdates.java | 1 +
.../lucene/index/NumericDocValuesFieldUpdates.java | 6 ++--
.../org/apache/lucene/index/TestIndexWriter.java | 34 ++++++++++++++++++++++
7 files changed, 74 insertions(+), 25 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 5dd4317..d9a164c 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -28,6 +28,9 @@ Bug fixes:
* LUCENE-8625: int overflow in ByteBuffersDataInput.sliceBufferList. (Mulugeta Mammo,
Dawid Weiss)
+* LUCENE-8639: Newly created threadstates while flushing / refreshing can cause duplicated
+ sequence IDs on IndexWriter. (Simon Willnauer)
+
New Features
* LUCENE-8026: ExitableDirectoryReader may now time out queries that run on
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index b448351..6b72333 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -279,7 +279,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread");
}
- perThreadPool.clearAbort();
+ perThreadPool.unlockNewThreadStates();
for (ThreadState state : threadStates) {
state.unlock();
}
@@ -288,7 +288,7 @@ final class DocumentsWriter implements Closeable, Accountable {
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
- perThreadPool.setAbort();
+ perThreadPool.lockNewThreadStates();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 5b6f8af..dd339f2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -459,8 +459,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
ThreadState obtainAndLock() {
- final ThreadState perThread = perThreadPool.getAndLock(Thread
- .currentThread(), documentsWriter);
+ final ThreadState perThread = perThreadPool.getAndLock();
boolean success = false;
try {
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
@@ -490,14 +489,18 @@ final class DocumentsWriterFlushControl implements Accountable {
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
- // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
- // if we have some sequence numbers that were never assigned:
- seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
- flushingQueue.maxSeqNo = seqNo+1;
-
- DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation+1, seqNo+1);
+ perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
+ try {
+ // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
+ // if we have some sequence numbers that were never assigned:
+ seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
+ flushingQueue.maxSeqNo = seqNo + 1;
+ DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
+ documentsWriter.deleteQueue = newQueue;
- documentsWriter.deleteQueue = newQueue;
+ } finally {
+ perThreadPool.unlockNewThreadStates();
+ }
}
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 102628f..7fafd41 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -111,7 +111,7 @@ final class DocumentsWriterPerThreadPool {
private final List<ThreadState> freeList = new ArrayList<>();
- private boolean aborted;
+ private int takenThreadStatePermits = 0;
/**
* Returns the active number of {@link ThreadState} instances.
@@ -120,15 +120,21 @@ final class DocumentsWriterPerThreadPool {
return threadStates.size();
}
- synchronized void setAbort() {
- aborted = true;
+ synchronized void lockNewThreadStates() {
+ // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0
+ // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some
+ // point
+ assert takenThreadStatePermits >= 0;
+ takenThreadStatePermits++;
}
- synchronized void clearAbort() {
- aborted = false;
- notifyAll();
+ synchronized void unlockNewThreadStates() {
+ assert takenThreadStatePermits > 0;
+ takenThreadStatePermits--;
+ if (takenThreadStatePermits == 0) {
+ notifyAll();
+ }
}
-
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>.
@@ -140,18 +146,20 @@ final class DocumentsWriterPerThreadPool {
* <code>null</code>
*/
private synchronized ThreadState newThreadState() {
- while (aborted) {
+ assert takenThreadStatePermits >= 0;
+ while (takenThreadStatePermits > 0) {
+ // we can't create new thread-states while not all permits are available
try {
wait();
} catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
+ throw new ThreadInterruptedException(ie);
}
}
ThreadState threadState = new ThreadState(null);
threadState.lock(); // lock so nobody else will get this ThreadState
threadStates.add(threadState);
return threadState;
- }
+}
DocumentsWriterPerThread reset(ThreadState threadState) {
assert threadState.isHeldByCurrentThread();
@@ -168,7 +176,7 @@ final class DocumentsWriterPerThreadPool {
// of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
- ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
+ ThreadState getAndLock() {
ThreadState threadState = null;
synchronized (this) {
if (freeList.isEmpty()) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
index c930a0b..121d0a4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
@@ -244,6 +244,7 @@ final class FrozenBufferedUpdates {
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
+ assert finalizer != null; // access the finalizer to prevent a warning
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);
diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
index 550a86a..0ad4469 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
@@ -34,11 +34,11 @@ import org.apache.lucene.util.packed.PagedMutable;
final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
// TODO: can't this just be NumericDocValues now? avoid boxing the long value...
final static class Iterator extends DocValuesFieldUpdates.AbstractIterator {
- private final AbstractPagedMutable values;
+ private final AbstractPagedMutable<?> values;
private final long minValue;
private long value;
- Iterator(int size, long minValue, AbstractPagedMutable values, PagedMutable docs, long delGen) {
+ Iterator(int size, long minValue, AbstractPagedMutable<?> values, PagedMutable docs, long delGen) {
super(size, docs, delGen);
this.values = values;
this.minValue = minValue;
@@ -58,7 +58,7 @@ final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
value = values.get(idx) + minValue;
}
}
- private AbstractPagedMutable values;
+ private AbstractPagedMutable<?> values;
private final long minValue;
NumericDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 3bdf200..b877824 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -3680,4 +3680,38 @@ public class TestIndexWriter extends LuceneTestCase {
}
}
+ // see LUCENE-8639
+ public void testFlushWhileStartingNewThreads() throws IOException, InterruptedException {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
+ w.addDocument(new Document());
+ int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
+ assertEquals(1, activeThreadStateCount);
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread(() -> {
+ latch.countDown();
+ List<Closeable> states = new ArrayList<>();
+ try {
+ for (int i = 0; i < 100; i++) {
+ DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock();
+ states.add(state::unlock);
+ if (state.isInitialized()) {
+ state.dwpt.deleteQueue.getNextSequenceNumber();
+ } else {
+ w.docWriter.deleteQueue.getNextSequenceNumber();
+ }
+ }
+ } finally {
+ IOUtils.closeWhileHandlingException(states);
+ }
+ });
+ thread.start();
+ latch.await();
+ w.docWriter.flushControl.markForFullFlush();
+ thread.join();
+ w.docWriter.flushControl.abortFullFlushes();
+ w.close();
+ dir.close();
+ }
+
}