You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/24 20:40:51 UTC
[lucene-solr] branch jira/lucene-8962 updated: add a test to stress
updateing the same doc wiht merge on commit
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/lucene-8962 by this push:
new 976397a add a test to stress updateing the same doc wiht merge on commit
976397a is described below
commit 976397ad6500764dba5e998b7c4e6c13bb73569b
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 22:40:23 2020 +0200
add a test to stress updateing the same doc wiht merge on commit
---
.../java/org/apache/lucene/index/IndexWriter.java | 19 +++++---
.../org/apache/lucene/index/IndexWriterConfig.java | 8 +--
.../apache/lucene/index/LiveIndexWriterConfig.java | 10 ++--
.../java/org/apache/lucene/index/MergePolicy.java | 2 +-
.../org/apache/lucene/index/TestIndexWriter.java | 2 +-
.../lucene/index/TestIndexWriterMergePolicy.java | 57 ++++++++++++++++++++--
.../org/apache/lucene/util/LuceneTestCase.java | 2 +-
7 files changed, 79 insertions(+), 21 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 4eed9d8..49304ba 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -29,7 +29,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -3183,7 +3182,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
long seqNo;
MergePolicy.MergeSpecification onCommitMerges = null;
AtomicBoolean includeInCommit = new AtomicBoolean(true);
- final long maxCommitMergeWaitSeconds = config.getMaxCommitMergeWaitSeconds();
+ final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
@@ -3243,7 +3242,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// merge completes which would otherwise have
// removed the files we are now syncing.
deleter.incRef(toCommit.files(false));
- if (anyChanges && maxCommitMergeWaitSeconds > 0) {
+ if (anyChanges && maxCommitMergeWaitMillis > 0) {
SegmentInfos committingSegmentInfos = toCommit;
onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
@@ -3297,8 +3296,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
if (onCommitMerges != null) {
- for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
- merge.initMergeReaders(IOContext.DEFAULT, sci -> getPooledInstance(sci, true));
+ boolean closeReaders = true;
+ try {
+ for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+ merge.initMergeReaders(IOContext.DEFAULT, sci -> getPooledInstance(sci, true));
+ }
+ closeReaders = false;
+ } finally {
+ if (closeReaders) {
+ IOUtils.applyToAll(onCommitMerges.merges, merge -> closeMergeReaders(merge, true, false));
+ }
}
}
}
@@ -3325,7 +3332,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (onCommitMerges != null) {
mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
- onCommitMerges.await(maxCommitMergeWaitSeconds, TimeUnit.SECONDS);
+ onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
synchronized (this) {
// we need to call this under lock since mergeFinished above is also called under the IW lock
includeInCommit.set(false);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 1237932..be38648 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -112,7 +112,7 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
- public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 0;
+ public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
@@ -467,13 +467,13 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
* Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and will still run to completion independent of the commit
- * like normal segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS}</code>.
+ * like normal segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
*
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
* has an implementation that actually returns merges which by default doesn't return any merges.
*/
- public IndexWriterConfig setMaxCommitMergeWaitSeconds(long maxCommitMergeWaitSeconds) {
- this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
+ public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
+ this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
return this;
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 9b1d56c..1450331 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -110,7 +110,7 @@ public class LiveIndexWriterConfig {
protected String softDeletesField = null;
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
- protected volatile long maxCommitMergeWaitSeconds;
+ protected volatile long maxCommitMergeWaitMillis;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@@ -134,7 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
- maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
+ maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
}
/** Returns the default analyzer to use for indexing documents. */
@@ -469,8 +469,8 @@ public class LiveIndexWriterConfig {
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and may still run to completion independent of the commit.
*/
- public long getMaxCommitMergeWaitSeconds() {
- return maxCommitMergeWaitSeconds;
+ public long getMaxCommitMergeWaitMillis() {
+ return maxCommitMergeWaitMillis;
}
@Override
@@ -496,7 +496,7 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
- sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
+ sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
return sb.toString();
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 60ad004..73702a0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -618,7 +618,7 @@ public abstract class MergePolicy {
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
*
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
- * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
+ * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} have elapsed. This may be
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
* in the commit.
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 41bd607..5ba0d80 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4205,7 +4205,7 @@ public class TestIndexWriter extends LuceneTestCase {
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
- iwc.setMaxCommitMergeWaitSeconds(30);
+ iwc.setMaxCommitMergeWaitMillis(30 * 1000);
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index 59b434f..23e2fb6 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@@ -30,6 +32,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
@@ -322,7 +325,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
firstWriter.close(); // When this writer closes, it does not merge on commit.
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30);
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30);
IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
@@ -373,7 +376,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForUpdate = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30)
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
.setSoftDeletesField("soft_delete")
.setMergeScheduler(new ConcurrentMergeScheduler())) {
@Override
@@ -444,7 +447,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
CountDownLatch waitForMerge = new CountDownLatch(1);
CountDownLatch waitForDeleteAll = new CountDownLatch(1);
try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
- .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30)
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
.setMergeScheduler(new SerialMergeScheduler() {
@Override
public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
@@ -482,4 +485,52 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
}
}
}
+
+ public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
+ try (Directory directory = newDirectory()) {
+ try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
+ .setSoftDeletesField("soft_delete")
+ .setMergeScheduler(new ConcurrentMergeScheduler()))) {
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ writer.updateDocument(new Term("id", "1"), d1);
+ writer.commit();
+
+ AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
+ AtomicBoolean done = new AtomicBoolean(false);
+ Thread[] threads = new Thread[1 + random().nextInt(4)];
+ for (int i = 0; i < threads.length; i++) {
+ Thread t = new Thread(() -> {
+ try {
+ while (iters.decrementAndGet() > 0) {
+ writer.updateDocument(new Term("id", "1"), d1);
+ }
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ } finally {
+ done.set(true);
+ }
+
+ });
+ t.start();
+ threads[i] = t;
+ }
+ try {
+ while (done.get() == false) {
+ if (random().nextBoolean()) {
+ writer.commit();
+ }
+ try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
+ assertEquals(1, open.numDocs());
+ }
+ }
+ } finally {
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
+ }
+ }
+ }
}
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index cc779a0..2dd2410 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1003,7 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
- c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
+ c.setMaxCommitMergeWaitMillis(atLeast(r, 200));
return c;
}