You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/24 20:40:51 UTC

[lucene-solr] branch jira/lucene-8962 updated: add a test to stress updateing the same doc wiht merge on commit

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/lucene-8962 by this push:
     new 976397a  add a test to stress updateing the same doc wiht merge on commit
976397a is described below

commit 976397ad6500764dba5e998b7c4e6c13bb73569b
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 22:40:23 2020 +0200

    add a test to stress updateing the same doc wiht merge on commit
---
 .../java/org/apache/lucene/index/IndexWriter.java  | 19 +++++---
 .../org/apache/lucene/index/IndexWriterConfig.java |  8 +--
 .../apache/lucene/index/LiveIndexWriterConfig.java | 10 ++--
 .../java/org/apache/lucene/index/MergePolicy.java  |  2 +-
 .../org/apache/lucene/index/TestIndexWriter.java   |  2 +-
 .../lucene/index/TestIndexWriterMergePolicy.java   | 57 ++++++++++++++++++++--
 .../org/apache/lucene/util/LuceneTestCase.java     |  2 +-
 7 files changed, 79 insertions(+), 21 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 4eed9d8..49304ba 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -29,7 +29,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -3183,7 +3182,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       long seqNo;
       MergePolicy.MergeSpecification onCommitMerges = null;
       AtomicBoolean includeInCommit = new AtomicBoolean(true);
-      final long maxCommitMergeWaitSeconds = config.getMaxCommitMergeWaitSeconds();
+      final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
       // This is copied from doFlush, except it's modified to
       // clone & incRef the flushed SegmentInfos inside the
       // sync block:
@@ -3243,7 +3242,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // merge completes which would otherwise have
               // removed the files we are now syncing.
               deleter.incRef(toCommit.files(false));
-              if (anyChanges && maxCommitMergeWaitSeconds > 0) {
+              if (anyChanges && maxCommitMergeWaitMillis > 0) {
                 SegmentInfos committingSegmentInfos = toCommit;
                 onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
                     new MergePolicy.OneMerge(toWrap.segments) {
@@ -3297,8 +3296,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
                     }
                 ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
                 if (onCommitMerges != null) {
-                  for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
-                    merge.initMergeReaders(IOContext.DEFAULT, sci -> getPooledInstance(sci, true));
+                  boolean closeReaders = true;
+                  try {
+                    for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+                      merge.initMergeReaders(IOContext.DEFAULT, sci -> getPooledInstance(sci, true));
+                    }
+                    closeReaders = false;
+                  } finally {
+                    if (closeReaders) {
+                      IOUtils.applyToAll(onCommitMerges.merges, merge -> closeMergeReaders(merge, true, false));
+                    }
                   }
                 }
               }
@@ -3325,7 +3332,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
       if (onCommitMerges != null) {
         mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
-        onCommitMerges.await(maxCommitMergeWaitSeconds, TimeUnit.SECONDS);
+        onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
         synchronized (this) {
           // we need to call this under lock since mergeFinished above is also called under the IW lock
           includeInCommit.set(false);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 1237932..be38648 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -112,7 +112,7 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
 
   /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
-  public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 0;
+  public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
   
   // indicates whether this config instance is already attached to a writer.
   // not final so that it can be cloned properly.
@@ -467,13 +467,13 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
    * Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
    * If this time is reached, we proceed with the commit based on segments merged up to that point.
    * The merges are not cancelled, and will still run to completion independent of the commit
-   * like normal segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS}</code>.
+   * like normal segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
    *
    * Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
    * has an implementation that actually returns merges which by default doesn't return any merges.
    */
-  public IndexWriterConfig setMaxCommitMergeWaitSeconds(long maxCommitMergeWaitSeconds) {
-    this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
+  public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
+    this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
     return this;
   }
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 9b1d56c..1450331 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -110,7 +110,7 @@ public class LiveIndexWriterConfig {
   protected String softDeletesField = null;
 
   /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
-  protected volatile long maxCommitMergeWaitSeconds;
+  protected volatile long maxCommitMergeWaitMillis;
 
   // used by IndexWriterConfig
   LiveIndexWriterConfig(Analyzer analyzer) {
@@ -134,7 +134,7 @@ public class LiveIndexWriterConfig {
     flushPolicy = new FlushByRamOrCountsPolicy();
     readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
     perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
-    maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
+    maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
   }
   
   /** Returns the default analyzer to use for indexing documents. */
@@ -469,8 +469,8 @@ public class LiveIndexWriterConfig {
    * If this time is reached, we proceed with the commit based on segments merged up to that point.
    * The merges are not cancelled, and may still run to completion independent of the commit.
    */
-  public long getMaxCommitMergeWaitSeconds() {
-    return maxCommitMergeWaitSeconds;
+  public long getMaxCommitMergeWaitMillis() {
+    return maxCommitMergeWaitMillis;
   }
 
   @Override
@@ -496,7 +496,7 @@ public class LiveIndexWriterConfig {
     sb.append("indexSort=").append(getIndexSort()).append("\n");
     sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
     sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
-    sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
+    sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
     return sb.toString();
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 60ad004..73702a0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -618,7 +618,7 @@ public abstract class MergePolicy {
    * Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
    *
    * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
-   * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
+   * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} have elapsed. This may be
    * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
    * the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
    * in the commit.
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 41bd607..5ba0d80 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4205,7 +4205,7 @@ public class TestIndexWriter extends LuceneTestCase {
   public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
     Directory dir = newDirectory();
     IndexWriterConfig iwc = newIndexWriterConfig();
-    iwc.setMaxCommitMergeWaitSeconds(30);
+    iwc.setMaxCommitMergeWaitMillis(30 * 1000);
     iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
       @Override
       public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index 59b434f..23e2fb6 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
@@ -30,6 +32,7 @@ import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 
 import org.apache.lucene.util.LuceneTestCase;
@@ -322,7 +325,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
     firstWriter.close(); // When this writer closes, it does not merge on commit.
 
     IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
-        .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30);
+        .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30);
 
 
     IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
@@ -373,7 +376,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
       CountDownLatch waitForMerge = new CountDownLatch(1);
       CountDownLatch waitForUpdate = new CountDownLatch(1);
       try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
-          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30)
+          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
           .setSoftDeletesField("soft_delete")
           .setMergeScheduler(new ConcurrentMergeScheduler())) {
         @Override
@@ -444,7 +447,7 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
       CountDownLatch waitForMerge = new CountDownLatch(1);
       CountDownLatch waitForDeleteAll = new CountDownLatch(1);
       try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
-          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30)
+          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
           .setMergeScheduler(new SerialMergeScheduler() {
             @Override
             public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
@@ -482,4 +485,52 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
       }
     }
   }
+
+  public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
+    try (Directory directory = newDirectory()) {
+      try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
+          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
+          .setSoftDeletesField("soft_delete")
+          .setMergeScheduler(new ConcurrentMergeScheduler()))) {
+        Document d1 = new Document();
+        d1.add(new StringField("id", "1", Field.Store.NO));
+        writer.updateDocument(new Term("id", "1"), d1);
+        writer.commit();
+
+        AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
+        AtomicBoolean done = new AtomicBoolean(false);
+        Thread[] threads = new Thread[1 + random().nextInt(4)];
+        for (int i = 0; i < threads.length; i++) {
+          Thread t = new Thread(() -> {
+            try {
+              while (iters.decrementAndGet() > 0) {
+                writer.updateDocument(new Term("id", "1"), d1);
+              }
+            } catch (Exception e) {
+              throw new AssertionError(e);
+            } finally {
+              done.set(true);
+            }
+
+          });
+          t.start();
+          threads[i] = t;
+        }
+        try {
+          while (done.get() == false) {
+            if (random().nextBoolean()) {
+              writer.commit();
+            }
+            try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
+              assertEquals(1, open.numDocs());
+            }
+          }
+        } finally {
+          for (Thread t : threads) {
+            t.join();
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index cc779a0..2dd2410 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1003,7 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
     if (rarely(r)) {
       c.setCheckPendingFlushUpdate(false);
     }
-    c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
+    c.setMaxCommitMergeWaitMillis(atLeast(r, 200));
     return c;
   }