You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/11 18:52:22 UTC

[03/21] lucene-solr:jira/solr-11285-sim: LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads

LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads

Index/Update Threads try to help out flushing pending document buffers to
disk. This change adds an expert setting to opt ouf of this behavior unless
flusing is falling behind.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ede46fe6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ede46fe6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ede46fe6

Branch: refs/heads/jira/solr-11285-sim
Commit: ede46fe6e972811ca49635d07106f177a7d90d30
Parents: a314145
Author: Simon Willnauer <si...@apache.org>
Authored: Wed Dec 6 18:20:48 2017 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Thu Dec 7 16:22:52 2017 +0100

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |  7 ++
 .../apache/lucene/index/DocumentsWriter.java    |  5 +-
 .../apache/lucene/index/IndexWriterConfig.java  |  5 ++
 .../lucene/index/LiveIndexWriterConfig.java     | 27 ++++++
 .../apache/lucene/index/TestIndexWriter.java    | 87 ++++++++++++++++++++
 .../lucene/index/TestIndexWriterConfig.java     |  2 +
 .../org/apache/lucene/util/LuceneTestCase.java  |  4 +
 7 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index bb84357..21da195 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -59,6 +59,13 @@ API Changes
 * LUCENE-8051: LevensteinDistance renamed to LevenshteinDistance.
   (Pulak Ghosh via Adrien Grand)
 
+Improvements
+
+* LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads
+  Index/Update Threads try to help out flushing pending document buffers to
+  disk. This change adds an expert setting to opt ouf of this behavior unless
+  flusing is falling behind. (Simon Willnauer)
+
 ======================= Lucene 7.2.0 =======================
 
 API Changes

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index d4e4e23..7ad4feb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -392,7 +392,8 @@ final class DocumentsWriter implements Closeable, Accountable {
   private boolean preUpdate() throws IOException, AbortingException {
     ensureOpen();
     boolean hasEvents = false;
-    if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
+
+    if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
       // Help out flushing any queued DWPTs so we can un-stall:
       do {
         // Try pick up pending threads here if possible
@@ -412,7 +413,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     hasEvents |= applyAllDeletes(deleteQueue);
     if (flushingDWPT != null) {
       hasEvents |= doFlush(flushingDWPT);
-    } else {
+    } else if (config.checkPendingFlushOnUpdate) {
       final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
       if (nextPendingFlush != null) {
         hasEvents |= doFlush(nextPendingFlush);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 6e96322..997a686 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -479,5 +479,10 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
     sb.append("writer=").append(writer.get()).append("\n");
     return sb.toString();
   }
+
+  @Override
+  public IndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
+    return (IndexWriterConfig) super.setCheckPendingFlushUpdate(checkPendingFlushOnUpdate);
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index b67d26b..1be6a73 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -103,6 +103,9 @@ public class LiveIndexWriterConfig {
   /** The field names involved in the index sort */
   protected Set<String> indexSortFields = Collections.emptySet();
 
+  /** if an indexing thread should check for pending flushes on update in order to help out on a full flush*/
+  protected volatile boolean checkPendingFlushOnUpdate = true;
+
   // used by IndexWriterConfig
   LiveIndexWriterConfig(Analyzer analyzer) {
     this.analyzer = analyzer;
@@ -426,6 +429,29 @@ public class LiveIndexWriterConfig {
     return indexSortFields;
   }
 
+  /**
+   * Expert: Returns if indexing threads check for pending flushes on update in order
+   * to help our flushing indexing buffers to disk
+   * @lucene.eperimental
+   */
+  public boolean isCheckPendingFlushOnUpdate() {
+    return checkPendingFlushOnUpdate;
+  }
+
+  /**
+   * Expert: sets if indexing threads check for pending flushes on update in order
+   * to help our flushing indexing buffers to disk. As a consequence, threads calling
+   * {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter)} or {@link IndexWriter#flush()} will
+   * be the only thread writing segments to disk unless flushes are falling behind. If indexing is stalled
+   * due to too many pending flushes indexing threads will help our writing pending segment flushes to disk.
+   *
+   * @lucene.eperimental
+   */
+  public LiveIndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
+    this.checkPendingFlushOnUpdate = checkPendingFlushOnUpdate;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -448,6 +474,7 @@ public class LiveIndexWriterConfig {
     sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n");
     sb.append("commitOnClose=").append(getCommitOnClose()).append("\n");
     sb.append("indexSort=").append(getIndexSort()).append("\n");
+    sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
     return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 04460cd..76a8172 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -29,6 +29,7 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -37,6 +38,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.CannedTokenStream;
@@ -2877,4 +2879,89 @@ public class TestIndexWriter extends LuceneTestCase {
     dir.close();
   }
 
+  public void testCheckPendingFlushPostUpdate() throws IOException, InterruptedException {
+    MockDirectoryWrapper dir = newMockDirectory();
+    Set<String> flushingThreads = Collections.synchronizedSet(new HashSet<>());
+    dir.failOn(new MockDirectoryWrapper.Failure() {
+      @Override
+      public void eval(MockDirectoryWrapper dir) throws IOException {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("flush".equals(trace[i].getMethodName())
+              && "org.apache.lucene.index.DocumentsWriterPerThread".equals(trace[i].getClassName())) {
+            flushingThreads.add(Thread.currentThread().getName());
+            break;
+          }
+        }
+      }
+    });
+    IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
+        .setCheckPendingFlushUpdate(false)
+        .setMaxBufferedDocs(Integer.MAX_VALUE)
+        .setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH));
+    AtomicBoolean done = new AtomicBoolean(false);
+    int numThreads = 1 + random().nextInt(3);
+    CountDownLatch latch = new CountDownLatch(numThreads);
+    Set<String> indexingThreads = new HashSet<>();
+    Thread[] threads = new Thread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      threads[i] = new Thread(() -> {
+        latch.countDown();
+        int numDocs = 0;
+        while (done.get() == false) {
+
+          Document doc = new Document();
+          doc.add(new StringField("id", "foo", Field.Store.YES));
+          try {
+            w.addDocument(doc);
+          } catch (Exception e) {
+            throw new AssertionError(e);
+          }
+          if (numDocs++ % 10 == 0) {
+            Thread.yield();
+          }
+        }
+      });
+      indexingThreads.add(threads[i].getName());
+      threads[i].start();
+    }
+    latch.await();
+    try {
+      int numIters = rarely() ? 1 + random().nextInt(5) : 1;
+      for (int i = 0; i < numIters; i++) {
+        waitForDocs(w);
+        w.commit();
+        assertTrue(flushingThreads.toString(), flushingThreads.contains(Thread.currentThread().getName()));
+        flushingThreads.retainAll(indexingThreads);
+        assertTrue(flushingThreads.toString(), flushingThreads.isEmpty());
+      }
+      w.getConfig().setCheckPendingFlushUpdate(true);
+      numIters = 0;
+      while (true) {
+        assertFalse("should finish in less than 100 iterations", numIters++ >= 100);
+        waitForDocs(w);
+        w.flush();
+        flushingThreads.retainAll(indexingThreads);
+        if (flushingThreads.isEmpty() == false) {
+          break;
+        }
+      }
+    } finally {
+      done.set(true);
+      for (int i = 0; i < numThreads; i++) {
+        threads[i].join();
+      }
+      IOUtils.close(w, dir);
+    }
+  }
+
+  private static void waitForDocs(IndexWriter w) {
+    int numDocsInRam = w.numRamDocs();
+    while(true) {
+      if (numDocsInRam != w.numRamDocs()) {
+        return;
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 464966a..063045e 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -74,6 +74,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
     assertEquals(Codec.getDefault(), conf.getCodec());
     assertEquals(InfoStream.getDefault(), conf.getInfoStream());
     assertEquals(IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM, conf.getUseCompoundFile());
+    assertTrue(conf.isCheckPendingFlushOnUpdate());
     // Sanity check - validate that all getters are covered.
     Set<String> getters = new HashSet<>();
     getters.add("getAnalyzer");
@@ -98,6 +99,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
     getters.add("getCodec");
     getters.add("getInfoStream");
     getters.add("getUseCompoundFile");
+    getters.add("isCheckPendingFlushOnUpdate");
     
     for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
       if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index d7b913c..ac48aa6 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -141,6 +141,7 @@ import com.carrotsearch.randomizedtesting.rules.StaticFieldsInvariantRule;
 
 import junit.framework.AssertionFailedError;
 
+import static com.carrotsearch.randomizedtesting.RandomizedTest.frequently;
 import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsBoolean;
 import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsInt;
 import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
@@ -988,6 +989,9 @@ public abstract class LuceneTestCase extends Assert {
     }
     c.setUseCompoundFile(r.nextBoolean());
     c.setReaderPooling(r.nextBoolean());
+    if (rarely(r)) {
+      c.setCheckPendingFlushUpdate(false);
+    }
     return c;
   }