You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/04 17:49:19 UTC

[35/50] lucene-solr:jira/solr-11458-2: LUCENE-8068: Allow IndexWriter to write a single DWPT to disk

LUCENE-8068: Allow IndexWriter to write a single DWPT to disk

Adds a `flushNextBuffer` method to IndexWriter that allows the caller to
synchronously move the next pending or the biggest non-pending index buffer to
disk. This enables flushing selected buffer to disk without highjacking an
indexing thread. This is for instance useful if more than one IW (shards) must
be maintained in a single JVM / system.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/01d12777
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/01d12777
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/01d12777

Branch: refs/heads/jira/solr-11458-2
Commit: 01d12777c4bcab7ae8085d5ed5e1b20a0e1a5526
Parents: ebdaa44
Author: Simon Willnauer <si...@apache.org>
Authored: Mon Nov 27 22:39:11 2017 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Thu Nov 30 18:57:27 2017 +0100

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   7 +
 .../apache/lucene/index/DocumentsWriter.java    |  15 +++
 .../index/DocumentsWriterFlushControl.java      |  88 ++++++++++---
 .../org/apache/lucene/index/FlushPolicy.java    |  25 +---
 .../org/apache/lucene/index/IndexWriter.java    |  25 ++++
 .../apache/lucene/index/TestIndexWriter.java    | 129 +++++++++++++++++++
 .../apache/lucene/index/RandomIndexWriter.java  |  11 ++
 7 files changed, 260 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 6b4aa9c..5943185 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -85,6 +85,13 @@ New Features
 * LUCENE-7736: IndexReaderFunctions expose various IndexReader statistics as
   DoubleValuesSources. (Alan Woodward)
 
+* LUCENE-8068: Allow IndexWriter to write a single DWPT to disk Adds a
+  flushNextBuffer method to IndexWriter that allows the caller to
+  synchronously move the next pending or the biggest non-pending index buffer to
+  disk. This enables flushing selected buffer to disk without highjacking an
+  indexing thread. This is for instance useful if more than one IW (shards) must
+  be maintained in a single JVM / system. (Simon Willnauer)
+
 Bug Fixes
 
 * LUCENE-8057: Exact circle bounds computation was incorrect.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 0e2a067..6aca9f4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -246,6 +246,21 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
   }
 
+  final boolean flushOneDWPT() throws IOException, AbortingException {
+    if (infoStream.isEnabled("DW")) {
+      infoStream.message("DW", "startFlushOneDWPT");
+    }
+    // first check if there is one pending
+    DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush();
+    if (documentsWriterPerThread == null) {
+      documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter();
+    }
+    if (documentsWriterPerThread != null) {
+      return doFlush(documentsWriterPerThread);
+    }
+    return false; // we didn't flush anything here
+  }
+
   /** Returns how many documents were aborted. */
   synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException {
     assert indexWriter.holdsFullFlushLock();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 047fb9c..761db0e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -182,23 +182,29 @@ final class DocumentsWriterFlushControl implements Accountable {
           setFlushPending(perThread);
         }
       }
-      final DocumentsWriterPerThread flushingDWPT;
-      if (fullFlush) {
-        if (perThread.flushPending) {
-          checkoutAndBlock(perThread);
-          flushingDWPT = nextPendingFlush();
-        } else {
-          flushingDWPT = null;
-        }
-      } else {
-        flushingDWPT = tryCheckoutForFlush(perThread);
-      }
-      return flushingDWPT;
+      return checkout(perThread, false);
     } finally {
       boolean stalled = updateStallState();
       assert assertNumDocsSinceStalled(stalled) && assertMemory();
     }
   }
+
+  private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
+    if (fullFlush) {
+      if (perThread.flushPending) {
+        checkoutAndBlock(perThread);
+        return nextPendingFlush();
+      } else {
+        return null;
+      }
+    } else {
+      if (markPending) {
+        assert perThread.isFlushPending() == false;
+        setFlushPending(perThread);
+      }
+      return tryCheckoutForFlush(perThread);
+    }
+  }
   
   private boolean assertNumDocsSinceStalled(boolean stalled) {
     /*
@@ -454,10 +460,6 @@ final class DocumentsWriterFlushControl implements Accountable {
     flushDeletes.set(true);
   }
   
-  int numActiveDWPT() {
-    return this.perThreadPool.getActiveThreadStateCount();
-  }
-  
   ThreadState obtainAndLock() {
     final ThreadState perThread = perThreadPool.getAndLock(Thread
         .currentThread(), documentsWriter);
@@ -713,4 +715,58 @@ final class DocumentsWriterFlushControl implements Accountable {
   public InfoStream getInfoStream() {
     return infoStream;
   }
+
+  ThreadState findLargestNonPendingWriter() {
+    ThreadState maxRamUsingThreadState = null;
+    long maxRamSoFar = 0;
+    Iterator<ThreadState> activePerThreadsIterator = allActiveThreadStates();
+    int count = 0;
+    while (activePerThreadsIterator.hasNext()) {
+      ThreadState next = activePerThreadsIterator.next();
+      if (!next.flushPending) {
+        final long nextRam = next.bytesUsed;
+        if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
+          if (infoStream.isEnabled("FP")) {
+            infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM());
+          }
+          count++;
+          if (nextRam > maxRamSoFar) {
+            maxRamSoFar = nextRam;
+            maxRamUsingThreadState = next;
+          }
+        }
+      }
+    }
+    if (infoStream.isEnabled("FP")) {
+      infoStream.message("FP", count + " in-use non-flushing threads states");
+    }
+    return maxRamUsingThreadState;
+  }
+
+  /**
+   * Returns the largest non-pending flushable DWPT or <code>null</code> if there is none.
+   */
+  final DocumentsWriterPerThread checkoutLargestNonPendingWriter() {
+    ThreadState largestNonPendingWriter = findLargestNonPendingWriter();
+    if (largestNonPendingWriter != null) {
+      // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue
+      largestNonPendingWriter.lock();
+      try {
+        synchronized (this) {
+          try {
+            if (largestNonPendingWriter.isInitialized() == false) {
+              return nextPendingFlush();
+            } else {
+              return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false);
+            }
+          } finally {
+            updateStallState();
+          }
+        }
+      } finally {
+        largestNonPendingWriter.unlock();
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
index cad07b4..8fa9e63 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
@@ -104,31 +104,8 @@ abstract class FlushPolicy {
   protected ThreadState findLargestNonPendingWriter(
       DocumentsWriterFlushControl control, ThreadState perThreadState) {
     assert perThreadState.dwpt.getNumDocsInRAM() > 0;
-    long maxRamSoFar = perThreadState.bytesUsed;
     // the dwpt which needs to be flushed eventually
-    ThreadState maxRamUsingThreadState = perThreadState;
-    assert !perThreadState.flushPending : "DWPT should have flushed";
-    Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates();
-    int count = 0;
-    while (activePerThreadsIterator.hasNext()) {
-      ThreadState next = activePerThreadsIterator.next();
-      if (!next.flushPending) {
-        final long nextRam = next.bytesUsed;
-        if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
-          if (infoStream.isEnabled("FP")) {
-            infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM());
-          }
-          count++;
-          if (nextRam > maxRamSoFar) {
-            maxRamSoFar = nextRam;
-            maxRamUsingThreadState = next;
-          }
-        }
-      }
-    }
-    if (infoStream.isEnabled("FP")) {
-      infoStream.message("FP", count + " in-use non-flushing threads states");
-    }
+    ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter();
     assert assertMessage("set largest ram consuming thread pending on lower watermark");
     return maxRamUsingThreadState;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 7f47e42..6059218 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -3163,6 +3163,31 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     return pendingSeqNo;
   }
 
+  /**
+   * <p>Expert: Flushes the next pending writer per thread buffer if available or the largest active
+   * non-pending writer per thread buffer in the calling thread.
+   * This can be used to flush documents to disk outside of an indexing thread. In contrast to {@link #flush()}
+   * this won't mark all currently active indexing buffers as flush-pending.
+   *
+   * Note: this method is best-effort and might not flush any segments to disk. If there is a full flush happening
+   * concurrently multiple segments might have been flushed.
+   * Users of this API can access the IndexWriters current memory consumption via {@link #ramBytesUsed()}
+   * </p>
+   * @return <code>true</code> iff this method flushed at least on segment to disk.
+   * @lucene.experimental
+   */
+  public final boolean flushNextBuffer() throws IOException {
+    try {
+      if (docWriter.flushOneDWPT()) {
+        processEvents(true, false);
+        return true; // we wrote a segment
+      }
+    } catch (AbortingException | VirtualMachineError tragedy) {
+      tragicEvent(tragedy, "flushNextBuffer");
+    }
+    return false;
+  }
+
   private long prepareCommitInternal() throws IOException {
     startCommitTime = System.nanoTime();
     synchronized(commitLock) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 9538e03..04460cd 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -2748,4 +2748,133 @@ public class TestIndexWriter extends LuceneTestCase {
     dir.close();
   }
 
+  public void testFlushLargestWriter() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
+    int numDocs = indexDocsForMultipleThreadStates(w);
+    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+        = w.docWriter.flushControl.findLargestNonPendingWriter();
+    assertFalse(largestNonPendingWriter.flushPending);
+    assertNotNull(largestNonPendingWriter.dwpt);
+
+    int numRamDocs = w.numRamDocs();
+    int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM();
+    assertTrue(w.flushNextBuffer());
+    assertNull(largestNonPendingWriter.dwpt);
+    assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs());
+
+    // make sure it's not locked
+    largestNonPendingWriter.lock();
+    largestNonPendingWriter.unlock();
+    if (random().nextBoolean()) {
+      w.commit();
+    }
+    DirectoryReader reader = DirectoryReader.open(w, true, true);
+    assertEquals(numDocs, reader.numDocs());
+    reader.close();
+    w.close();
+    dir.close();
+  }
+
+  private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException {
+    Thread[] threads = new Thread[3];
+    CountDownLatch latch = new CountDownLatch(threads.length);
+    int numDocsPerThread = 10 + random().nextInt(30);
+    // ensure we have more than on thread state
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new Thread(() -> {
+        latch.countDown();
+        try {
+          latch.await();
+          for (int j = 0; j < numDocsPerThread; j++) {
+            Document doc = new Document();
+            doc.add(new StringField("id", "foo", Field.Store.YES));
+            w.addDocument(doc);
+          }
+        } catch (Exception e) {
+          throw new AssertionError(e);
+        }
+      });
+      threads[i].start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+    return numDocsPerThread * threads.length;
+  }
+
+  public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
+    indexDocsForMultipleThreadStates(w);
+    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+        = w.docWriter.flushControl.findLargestNonPendingWriter();
+    assertFalse(largestNonPendingWriter.flushPending);
+    assertNotNull(largestNonPendingWriter.dwpt);
+    int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
+    w.docWriter.flushControl.markForFullFlush();
+    DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter();
+    assertNull(documentsWriterPerThread);
+    assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes());
+    w.docWriter.flushControl.abortFullFlushes();
+    assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter());
+    assertEquals(0, w.docWriter.flushControl.numQueuedFlushes());
+    w.close();
+    dir.close();
+  }
+
+  public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
+    int numDocs = indexDocsForMultipleThreadStates(w);
+    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+        = w.docWriter.flushControl.findLargestNonPendingWriter();
+    assertFalse(largestNonPendingWriter.flushPending);
+    assertNotNull(largestNonPendingWriter.dwpt);
+
+    CountDownLatch wait = new CountDownLatch(1);
+    CountDownLatch locked = new CountDownLatch(1);
+    Thread lockThread = new Thread(() -> {
+      try {
+        largestNonPendingWriter.lock();
+        locked.countDown();
+        wait.await();
+      } catch (InterruptedException e) {
+        throw new AssertionError(e);
+      } finally {
+        largestNonPendingWriter.unlock();
+      }
+    });
+    lockThread.start();
+    Thread flushThread = new Thread(() -> {
+      try {
+        locked.await();
+        assertTrue(w.flushNextBuffer());
+      } catch (Exception e) {
+        throw new AssertionError(e);
+      }
+    });
+    flushThread.start();
+
+    locked.await();
+    // access a synced method to ensure we never lock while we hold the flush control monitor
+    w.docWriter.flushControl.activeBytes();
+    wait.countDown();
+    lockThread.join();
+    flushThread.join();
+
+    assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt);
+    // make sure it's not locked
+    largestNonPendingWriter.lock();
+    largestNonPendingWriter.unlock();
+    if (random().nextBoolean()) {
+      w.commit();
+    }
+    DirectoryReader reader = DirectoryReader.open(w, true, true);
+    assertEquals(numDocs, reader.numDocs());
+    reader.close();
+    w.close();
+    dir.close();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/01d12777/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
index d46c248..aa4da54 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
@@ -181,6 +181,17 @@ public class RandomIndexWriter implements Closeable {
     if (docCount++ == flushAt) {
       if (r.nextBoolean()) {
         if (LuceneTestCase.VERBOSE) {
+          System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
+        }
+        int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
+        int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1));
+        for (int i = 0; i < numFlushes; i++) {
+          if (w.flushNextBuffer() == false) {
+            break; // stop once we didn't flush anything
+          }
+        }
+      } else if (r.nextBoolean()) {
+        if (LuceneTestCase.VERBOSE) {
           System.out.println("RIW.add/updateDocument: now doing a flush at docCount=" + docCount);
         }
         w.flush();