You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2019/01/16 15:43:09 UTC

[lucene-solr] branch branch_7x updated: LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_7x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_7x by this push:
     new 0d306b4  LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)
0d306b4 is described below

commit 0d306b40b77ba85d1602c60621e93c5b190b6ef9
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jan 16 16:37:49 2019 +0100

    LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)
    
    This prevents an edge case where suddenly a lot of threads start indexing
    while we carry over sequence ids from the previous to the new delete queue.
    We now lock creation of new thread states for a very short time until we created and assigned
    a new delete queue.
---
 lucene/CHANGES.txt                                 |  3 ++
 .../org/apache/lucene/index/DocumentsWriter.java   |  4 +--
 .../lucene/index/DocumentsWriterFlushControl.java  | 21 +++++++------
 .../lucene/index/DocumentsWriterPerThreadPool.java | 30 ++++++++++++-------
 .../apache/lucene/index/FrozenBufferedUpdates.java |  1 +
 .../lucene/index/NumericDocValuesFieldUpdates.java |  6 ++--
 .../org/apache/lucene/index/TestIndexWriter.java   | 34 ++++++++++++++++++++++
 7 files changed, 74 insertions(+), 25 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 5dd4317..d9a164c 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -28,6 +28,9 @@ Bug fixes:
 * LUCENE-8625: int overflow in ByteBuffersDataInput.sliceBufferList. (Mulugeta Mammo,
   Dawid Weiss)
 
+* LUCENE-8639: Newly created threadstates while flushing / refreshing can cause duplicated
+  sequence IDs on IndexWriter. (Simon Willnauer)
+
 New Features
 
 * LUCENE-8026: ExitableDirectoryReader may now time out queries that run on
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index b448351..6b72333 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -279,7 +279,7 @@ final class DocumentsWriter implements Closeable, Accountable {
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", "unlockAllAbortedThread");
         }
-        perThreadPool.clearAbort();
+        perThreadPool.unlockNewThreadStates();
         for (ThreadState state : threadStates) {
           state.unlock();
         }
@@ -288,7 +288,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     try {
       deleteQueue.clear();
       final int limit = perThreadPool.getMaxThreadStates();
-      perThreadPool.setAbort();
+      perThreadPool.lockNewThreadStates();
       for (int i = 0; i < limit; i++) {
         final ThreadState perThread = perThreadPool.getThreadState(i);
         perThread.lock();
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 5b6f8af..dd339f2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -459,8 +459,7 @@ final class DocumentsWriterFlushControl implements Accountable {
   }
   
   ThreadState obtainAndLock() {
-    final ThreadState perThread = perThreadPool.getAndLock(Thread
-        .currentThread(), documentsWriter);
+    final ThreadState perThread = perThreadPool.getAndLock();
     boolean success = false;
     try {
       if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
@@ -490,14 +489,18 @@ final class DocumentsWriterFlushControl implements Accountable {
       // Set a new delete queue - all subsequent DWPT will use this queue until
       // we do another full flush
 
-      // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight.  It's fine
-      // if we have some sequence numbers that were never assigned:
-      seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
-      flushingQueue.maxSeqNo = seqNo+1;
-
-      DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation+1, seqNo+1);
+      perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
+      try {
+        // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight.  It's fine
+        // if we have some sequence numbers that were never assigned:
+        seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
+        flushingQueue.maxSeqNo = seqNo + 1;
+        DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
+        documentsWriter.deleteQueue = newQueue;
 
-      documentsWriter.deleteQueue = newQueue;
+      } finally {
+        perThreadPool.unlockNewThreadStates();
+      }
     }
     final int limit = perThreadPool.getActiveThreadStateCount();
     for (int i = 0; i < limit; i++) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 102628f..7fafd41 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -111,7 +111,7 @@ final class DocumentsWriterPerThreadPool {
 
   private final List<ThreadState> freeList = new ArrayList<>();
 
-  private boolean aborted;
+  private int takenThreadStatePermits = 0;
 
   /**
    * Returns the active number of {@link ThreadState} instances.
@@ -120,15 +120,21 @@ final class DocumentsWriterPerThreadPool {
     return threadStates.size();
   }
 
-  synchronized void setAbort() {
-    aborted = true;
+  synchronized void lockNewThreadStates() {
+    // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0
+    // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some
+    // point
+    assert takenThreadStatePermits >= 0;
+    takenThreadStatePermits++;
   }
 
-  synchronized void clearAbort() {
-    aborted = false;
-    notifyAll();
+  synchronized void unlockNewThreadStates() {
+    assert takenThreadStatePermits > 0;
+    takenThreadStatePermits--;
+    if (takenThreadStatePermits == 0) {
+      notifyAll();
+    }
   }
-
   /**
    * Returns a new {@link ThreadState} iff any new state is available otherwise
    * <code>null</code>.
@@ -140,18 +146,20 @@ final class DocumentsWriterPerThreadPool {
    *         <code>null</code>
    */
   private synchronized ThreadState newThreadState() {
-    while (aborted) {
+    assert takenThreadStatePermits >= 0;
+    while (takenThreadStatePermits > 0) {
+      // we can't create new thread-states while not all permits are available
       try {
         wait();
       } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);        
+        throw new ThreadInterruptedException(ie);
       }
     }
     ThreadState threadState = new ThreadState(null);
     threadState.lock(); // lock so nobody else will get this ThreadState
     threadStates.add(threadState);
     return threadState;
-  }
+}
 
   DocumentsWriterPerThread reset(ThreadState threadState) {
     assert threadState.isHeldByCurrentThread();
@@ -168,7 +176,7 @@ final class DocumentsWriterPerThreadPool {
   // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
 
   /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
-  ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
+  ThreadState getAndLock() {
     ThreadState threadState = null;
     synchronized (this) {
       if (freeList.isEmpty()) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
index c930a0b..121d0a4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
@@ -244,6 +244,7 @@ final class FrozenBufferedUpdates {
         AtomicBoolean success = new AtomicBoolean();
         long delCount;
         try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
+          assert finalizer != null; // access the finalizer to prevent a warning
           // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
           delCount = apply(segStates);
           success.set(true);
diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
index 550a86a..0ad4469 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
@@ -34,11 +34,11 @@ import org.apache.lucene.util.packed.PagedMutable;
 final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
   // TODO: can't this just be NumericDocValues now?  avoid boxing the long value...
   final static class Iterator extends DocValuesFieldUpdates.AbstractIterator {
-    private final AbstractPagedMutable values;
+    private final AbstractPagedMutable<?> values;
     private final long minValue;
     private long value;
 
-    Iterator(int size, long minValue, AbstractPagedMutable values, PagedMutable docs, long delGen) {
+    Iterator(int size, long minValue, AbstractPagedMutable<?> values, PagedMutable docs, long delGen) {
       super(size, docs, delGen);
       this.values = values;
       this.minValue = minValue;
@@ -58,7 +58,7 @@ final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
       value = values.get(idx) + minValue;
     }
   }
-  private AbstractPagedMutable values;
+  private AbstractPagedMutable<?> values;
   private final long minValue;
 
   NumericDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 3bdf200..b877824 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -3680,4 +3680,38 @@ public class TestIndexWriter extends LuceneTestCase {
     }
   }
 
+  // see LUCENE-8639
+  public void testFlushWhileStartingNewThreads() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
+    w.addDocument(new Document());
+    int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
+    assertEquals(1, activeThreadStateCount);
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread thread = new Thread(() -> {
+      latch.countDown();
+      List<Closeable> states = new ArrayList<>();
+      try {
+        for (int i = 0; i < 100; i++) {
+          DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock();
+          states.add(state::unlock);
+          if (state.isInitialized()) {
+            state.dwpt.deleteQueue.getNextSequenceNumber();
+          } else {
+            w.docWriter.deleteQueue.getNextSequenceNumber();
+          }
+        }
+      } finally {
+        IOUtils.closeWhileHandlingException(states);
+      }
+    });
+    thread.start();
+    latch.await();
+    w.docWriter.flushControl.markForFullFlush();
+    thread.join();
+    w.docWriter.flushControl.abortFullFlushes();
+    w.close();
+    dir.close();
+  }
+
 }