You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/08/24 07:17:13 UTC

[lucene-solr] branch branch_8x updated: LUCENE-9473: Ensure merges are stopped during abort merges (#1772)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new a07da4a  LUCENE-9473: Ensure merges are stopped during abort merges (#1772)
a07da4a is described below

commit a07da4a3193a22ec5db87c0eee0ffee345bf2c15
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Mon Aug 24 09:15:42 2020 +0200

    LUCENE-9473: Ensure merges are stopped during abort merges (#1772)
    
    We need to disable merges while we wait for running merges since
    IW calls timed wait on it's lock that releases the monitor for the time
    being which allows new merges to be registered unless we disable them.
---
 .../java/org/apache/lucene/index/IndexWriter.java  | 50 +++++++++++++++++-----
 .../apache/lucene/index/TestIndexWriterDelete.java |  7 +--
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index d45bed1..352d464 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -401,7 +401,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   private final Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
   private final List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
   private long mergeGen;
-  private boolean stopMerges; // TODO make sure this is only changed once and never set back to false
+  private Merges merges = new Merges();
   private boolean didMessageState;
   private final AtomicInteger flushCount = new AtomicInteger();
   private final AtomicInteger flushDeletesCount = new AtomicInteger();
@@ -2161,7 +2161,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
     assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
     assert trigger != null;
-    if (stopMerges) {
+    if (merges.areEnabled() == false) {
       return null;
     }
 
@@ -2278,10 +2278,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     
     try {
       synchronized (this) {
-        // must be synced otherwise register merge might throw and exception if stopMerges
+        // must be synced otherwise register merge might throw and exception if merges
         // changes concurrently, abortMerges is synced as well
-        stopMerges = true; // this disables merges forever
-        abortMerges();
+        abortMerges(); // this disables merges forever since we are closing and can't reenable them
         assert mergingSegments.isEmpty() : "we aborted all merges but still have merging segments: " + mergingSegments;
       }
       if (infoStream.isEnabled("IW")) {
@@ -2444,7 +2443,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
           synchronized (this) {
             try {
               // Abort any running merges
-              abortMerges();
+              try {
+                abortMerges();
+                assert merges.areEnabled() == false : "merges should be disabled - who enabled them?";
+                assert mergingSegments.isEmpty() : "found merging segments but merges are disabled: " + mergingSegments;
+              } finally {
+                // abortMerges disables all merges and we need to re-enable them here to make sure
+                // IW can function properly. An exception in abortMerges() might be fatal for IW but just to be sure
+                // lets re-enable merges anyway.
+                merges.enable();
+              }
               adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
               // Remove all segments
               segmentInfos.clear();
@@ -2468,6 +2476,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               return seqNo;
             } finally {
               if (success == false) {
+
                 if (infoStream.isEnabled("IW")) {
                   infoStream.message("IW", "hit exception during deleteAll");
                 }
@@ -2486,6 +2495,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
    *  method: when you abort a long-running merge, you lose
    *  a lot of work that must later be redone. */
   private synchronized void abortMerges() throws IOException {
+    merges.disable();
     // Abort all pending & running merges:
     IOUtils.applyToAll(pendingMerges, merge -> {
       if (infoStream.isEnabled("IW")) {
@@ -2986,7 +2996,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
       synchronized (this) {
         ensureOpen();
-        assert stopMerges == false;
+        assert merges.areEnabled();
         runningAddIndexesMerges.add(merger);
       }
       try {
@@ -3007,7 +3017,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       final MergePolicy mergePolicy = config.getMergePolicy();
       boolean useCompoundFile;
       synchronized(this) { // Guard segmentInfos
-        if (stopMerges) {
+        if (merges.areEnabled() == false) {
           // Safe: these files must exist
           deleteNewFiles(infoPerCommit.files());
 
@@ -3043,7 +3053,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
       // Register the new segment
       synchronized(this) {
-        if (stopMerges) {
+        if (merges.areEnabled() == false) {
           // Safe: these files must exist
           deleteNewFiles(infoPerCommit.files());
 
@@ -4249,7 +4259,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
     assert merge.segments.size() > 0;
 
-    if (stopMerges) {
+    if (merges.areEnabled() == false) {
       abortOneMerge(merge);
       throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
     }
@@ -5745,4 +5755,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       return writer.segString();
     }
   }
+
+  private class Merges {
+    private boolean mergesEnabled = true;
+
+    boolean areEnabled() {
+      assert Thread.holdsLock(IndexWriter.this);
+      return mergesEnabled;
+    }
+
+    void disable() {
+      assert Thread.holdsLock(IndexWriter.this);
+      mergesEnabled = false;
+    }
+
+    void enable() {
+      ensureOpen();
+      assert Thread.holdsLock(IndexWriter.this);
+      mergesEnabled = true;
+    }
+  }
 }
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
index e7dbfd2..1f9781a 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
@@ -300,10 +300,11 @@ public class TestIndexWriterDelete extends LuceneTestCase {
     modifier.close();
     dir.close();
   }
-  
+
   public void testDeleteAllNoDeadLock() throws IOException, InterruptedException {
     Directory dir = newDirectory();
-    final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir); 
+    final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir,
+        newIndexWriterConfig().setMergePolicy(new MockRandomMergePolicy(random())));
     int numThreads = atLeast(2);
     Thread[] threads = new Thread[numThreads];
     final CountDownLatch latch = new CountDownLatch(1);
@@ -341,7 +342,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
       threads[i].start();
     }
     latch.countDown();
-    while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
+    while (!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
       if (VERBOSE) {
         System.out.println("\nTEST: now deleteAll");
       }