You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/11 18:52:22 UTC
[03/21] lucene-solr:jira/solr-11285-sim: LUCENE-8081: Allow
IndexWriter to opt out of flushing on indexing threads
LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads
Index/Update Threads try to help out flushing pending document buffers to
disk. This change adds an expert setting to opt ouf of this behavior unless
flusing is falling behind.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ede46fe6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ede46fe6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ede46fe6
Branch: refs/heads/jira/solr-11285-sim
Commit: ede46fe6e972811ca49635d07106f177a7d90d30
Parents: a314145
Author: Simon Willnauer <si...@apache.org>
Authored: Wed Dec 6 18:20:48 2017 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Thu Dec 7 16:22:52 2017 +0100
----------------------------------------------------------------------
lucene/CHANGES.txt | 7 ++
.../apache/lucene/index/DocumentsWriter.java | 5 +-
.../apache/lucene/index/IndexWriterConfig.java | 5 ++
.../lucene/index/LiveIndexWriterConfig.java | 27 ++++++
.../apache/lucene/index/TestIndexWriter.java | 87 ++++++++++++++++++++
.../lucene/index/TestIndexWriterConfig.java | 2 +
.../org/apache/lucene/util/LuceneTestCase.java | 4 +
7 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index bb84357..21da195 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -59,6 +59,13 @@ API Changes
* LUCENE-8051: LevensteinDistance renamed to LevenshteinDistance.
(Pulak Ghosh via Adrien Grand)
+Improvements
+
+* LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads
+ Index/Update Threads try to help out flushing pending document buffers to
+ disk. This change adds an expert setting to opt ouf of this behavior unless
+ flusing is falling behind. (Simon Willnauer)
+
======================= Lucene 7.2.0 =======================
API Changes
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index d4e4e23..7ad4feb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -392,7 +392,8 @@ final class DocumentsWriter implements Closeable, Accountable {
private boolean preUpdate() throws IOException, AbortingException {
ensureOpen();
boolean hasEvents = false;
- if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
+
+ if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
do {
// Try pick up pending threads here if possible
@@ -412,7 +413,7 @@ final class DocumentsWriter implements Closeable, Accountable {
hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
- } else {
+ } else if (config.checkPendingFlushOnUpdate) {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
hasEvents |= doFlush(nextPendingFlush);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 6e96322..997a686 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -479,5 +479,10 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
sb.append("writer=").append(writer.get()).append("\n");
return sb.toString();
}
+
+ @Override
+ public IndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
+ return (IndexWriterConfig) super.setCheckPendingFlushUpdate(checkPendingFlushOnUpdate);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index b67d26b..1be6a73 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -103,6 +103,9 @@ public class LiveIndexWriterConfig {
/** The field names involved in the index sort */
protected Set<String> indexSortFields = Collections.emptySet();
+ /** if an indexing thread should check for pending flushes on update in order to help out on a full flush*/
+ protected volatile boolean checkPendingFlushOnUpdate = true;
+
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
@@ -426,6 +429,29 @@ public class LiveIndexWriterConfig {
return indexSortFields;
}
+ /**
+ * Expert: Returns if indexing threads check for pending flushes on update in order
+ * to help our flushing indexing buffers to disk
+ * @lucene.eperimental
+ */
+ public boolean isCheckPendingFlushOnUpdate() {
+ return checkPendingFlushOnUpdate;
+ }
+
+ /**
+ * Expert: sets if indexing threads check for pending flushes on update in order
+ * to help our flushing indexing buffers to disk. As a consequence, threads calling
+ * {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter)} or {@link IndexWriter#flush()} will
+ * be the only thread writing segments to disk unless flushes are falling behind. If indexing is stalled
+ * due to too many pending flushes indexing threads will help our writing pending segment flushes to disk.
+ *
+ * @lucene.eperimental
+ */
+ public LiveIndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
+ this.checkPendingFlushOnUpdate = checkPendingFlushOnUpdate;
+ return this;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -448,6 +474,7 @@ public class LiveIndexWriterConfig {
sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n");
sb.append("commitOnClose=").append(getCommitOnClose()).append("\n");
sb.append("indexSort=").append(getIndexSort()).append("\n");
+ sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 04460cd..76a8172 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -29,6 +29,7 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -37,6 +38,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CannedTokenStream;
@@ -2877,4 +2879,89 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
+ public void testCheckPendingFlushPostUpdate() throws IOException, InterruptedException {
+ MockDirectoryWrapper dir = newMockDirectory();
+ Set<String> flushingThreads = Collections.synchronizedSet(new HashSet<>());
+ dir.failOn(new MockDirectoryWrapper.Failure() {
+ @Override
+ public void eval(MockDirectoryWrapper dir) throws IOException {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("flush".equals(trace[i].getMethodName())
+ && "org.apache.lucene.index.DocumentsWriterPerThread".equals(trace[i].getClassName())) {
+ flushingThreads.add(Thread.currentThread().getName());
+ break;
+ }
+ }
+ }
+ });
+ IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
+ .setCheckPendingFlushUpdate(false)
+ .setMaxBufferedDocs(Integer.MAX_VALUE)
+ .setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH));
+ AtomicBoolean done = new AtomicBoolean(false);
+ int numThreads = 1 + random().nextInt(3);
+ CountDownLatch latch = new CountDownLatch(numThreads);
+ Set<String> indexingThreads = new HashSet<>();
+ Thread[] threads = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread(() -> {
+ latch.countDown();
+ int numDocs = 0;
+ while (done.get() == false) {
+
+ Document doc = new Document();
+ doc.add(new StringField("id", "foo", Field.Store.YES));
+ try {
+ w.addDocument(doc);
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ if (numDocs++ % 10 == 0) {
+ Thread.yield();
+ }
+ }
+ });
+ indexingThreads.add(threads[i].getName());
+ threads[i].start();
+ }
+ latch.await();
+ try {
+ int numIters = rarely() ? 1 + random().nextInt(5) : 1;
+ for (int i = 0; i < numIters; i++) {
+ waitForDocs(w);
+ w.commit();
+ assertTrue(flushingThreads.toString(), flushingThreads.contains(Thread.currentThread().getName()));
+ flushingThreads.retainAll(indexingThreads);
+ assertTrue(flushingThreads.toString(), flushingThreads.isEmpty());
+ }
+ w.getConfig().setCheckPendingFlushUpdate(true);
+ numIters = 0;
+ while (true) {
+ assertFalse("should finish in less than 100 iterations", numIters++ >= 100);
+ waitForDocs(w);
+ w.flush();
+ flushingThreads.retainAll(indexingThreads);
+ if (flushingThreads.isEmpty() == false) {
+ break;
+ }
+ }
+ } finally {
+ done.set(true);
+ for (int i = 0; i < numThreads; i++) {
+ threads[i].join();
+ }
+ IOUtils.close(w, dir);
+ }
+ }
+
+ private static void waitForDocs(IndexWriter w) {
+ int numDocsInRam = w.numRamDocs();
+ while(true) {
+ if (numDocsInRam != w.numRamDocs()) {
+ return;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 464966a..063045e 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -74,6 +74,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertEquals(Codec.getDefault(), conf.getCodec());
assertEquals(InfoStream.getDefault(), conf.getInfoStream());
assertEquals(IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM, conf.getUseCompoundFile());
+ assertTrue(conf.isCheckPendingFlushOnUpdate());
// Sanity check - validate that all getters are covered.
Set<String> getters = new HashSet<>();
getters.add("getAnalyzer");
@@ -98,6 +99,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getCodec");
getters.add("getInfoStream");
getters.add("getUseCompoundFile");
+ getters.add("isCheckPendingFlushOnUpdate");
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ede46fe6/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index d7b913c..ac48aa6 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -141,6 +141,7 @@ import com.carrotsearch.randomizedtesting.rules.StaticFieldsInvariantRule;
import junit.framework.AssertionFailedError;
+import static com.carrotsearch.randomizedtesting.RandomizedTest.frequently;
import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsInt;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
@@ -988,6 +989,9 @@ public abstract class LuceneTestCase extends Assert {
}
c.setUseCompoundFile(r.nextBoolean());
c.setReaderPooling(r.nextBoolean());
+ if (rarely(r)) {
+ c.setCheckPendingFlushUpdate(false);
+ }
return c;
}