You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/24 20:11:05 UTC

[lucene-solr] branch jira/lucene-8962 updated (2aa937e -> 179a730)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a change to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 2aa937e  apply comments from #1601
     add 6a45586  Change JoinQuery class's visibility to package again (#1611)
     add f47de19  LUCENE-9408: Ensure OneMerge#mergeFinished is only called once (#1590)
     new 09187e8  Merge branch 'master' into jira/lucene-8962
     new 179a730  add test and infrastructure to ensure readers are closed when merges are aborted

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/lucene/index/IndexWriter.java  | 95 ++++++++++++----------
 .../java/org/apache/lucene/index/MergePolicy.java  | 10 +--
 .../lucene/index/TestIndexWriterMergePolicy.java   | 48 +++++++++++
 .../org/apache/lucene/index/TestMergePolicy.java   |  1 -
 .../src/java/org/apache/solr/search/JoinQuery.java |  2 +-
 5 files changed, 106 insertions(+), 50 deletions(-)


[lucene-solr] 02/02: add test and infrastructure to ensure readers are closed when merges are aborted

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 179a730006e9d2462ae6e7ded9c8a21bf014a224
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 22:10:43 2020 +0200

    add test and infrastructure to ensure readers are closed when merges are aborted
---
 .../java/org/apache/lucene/index/IndexWriter.java  | 47 +++++++++++----------
 .../lucene/index/TestIndexWriterMergePolicy.java   | 48 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 21 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 44d65c9..a411ce8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2470,23 +2470,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   /** Aborts running merges.  Be careful when using this
    *  method: when you abort a long-running merge, you lose
    *  a lot of work that must later be redone. */
-  private synchronized void abortMerges() {
+  private synchronized void abortMerges() throws IOException {
     // Abort all pending & running merges:
-    for (final MergePolicy.OneMerge merge : pendingMerges) {
+    IOUtils.applyToAll(pendingMerges, merge -> {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
       }
-      merge.setAborted();
+      abortOneMerge(merge);
       mergeFinish(merge);
-    }
+    });
     pendingMerges.clear();
-
-    for (final MergePolicy.OneMerge merge : runningMerges) {
+    IOUtils.applyToAll(runningMerges, merge -> {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort running merge " + segString(merge.segments));
       }
-      merge.setAborted();
-    }
+      abortOneMerge(merge);
+      mergeFinish(merge);
+    });
 
     // We wait here to make all merges stop.  It should not
     // take very long because they periodically check if
@@ -4102,11 +4102,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       try {
         try {
           mergeInit(merge);
-
           if (infoStream.isEnabled("IW")) {
             infoStream.message("IW", "now merge\n  merge=" + segString(merge.segments) + "\n  index=" + segString());
           }
-
           mergeMiddle(merge, mergePolicy);
           mergeSuccess(merge);
           success = true;
@@ -4115,7 +4113,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
         }
       } finally {
         synchronized(this) {
-
+          // Readers are already closed in commitMerge if we didn't hit
+          // an exc:
+          if (success == false) {
+            closeMergeReaders(merge, true, false);
+          }
           mergeFinish(merge);
 
           if (success == false) {
@@ -4147,6 +4149,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   /** Hook that's called when the specified merge is complete. */
   protected void mergeSuccess(MergePolicy.OneMerge merge) {}
 
+  private void abortOneMerge(MergePolicy.OneMerge merge) throws IOException {
+    merge.setAborted();
+    closeMergeReaders(merge, true, false);
+  }
+
   /** Checks whether this merge involves any segments
    *  already participating in a merge.  If not, this merge
    *  is "registered", meaning we record that its segments
@@ -4161,7 +4168,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     assert merge.segments.size() > 0;
 
     if (stopMerges) {
-      merge.setAborted();
+      abortOneMerge(merge);
       throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
     }
 
@@ -4431,8 +4438,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     merge.checkAborted();
 
     Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
-    List<SegmentCommitInfo> sourceSegments = merge.segments;
-    
     IOContext context = new IOContext(merge.getStoreMergeInfo());
 
     final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
@@ -4488,7 +4493,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       }
       final SegmentMerger merger = new SegmentMerger(mergeReaders,
                                                      merge.info.info, infoStream, dirWrapper,
-                                                     globalFieldNumberMap, 
+                                                     globalFieldNumberMap,
                                                      context);
       merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
       merge.checkAborted();
@@ -4509,8 +4514,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
           String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
             .stream()
             .filter((e) -> e.getValue() > 0)
-            .map((e) -> String.format(Locale.ROOT, "%.1f sec %s", 
-                e.getValue() / 1000000000., 
+            .map((e) -> String.format(Locale.ROOT, "%.1f sec %s",
+                e.getValue() / 1000000000.,
                 e.getKey().name().toLowerCase(Locale.ROOT)))
             .collect(Collectors.joining(", "));
           if (!pauseInfo.isEmpty()) {
@@ -4522,9 +4527,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
           double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
           infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
                              (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
-                             (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + 
-                             (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " + 
-                             (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + 
+                             (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
+                             (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
+                             (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
                              (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
                              (mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " +
                              String.format(Locale.ROOT,
@@ -4634,7 +4639,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
       // TODO: ideally we would freeze merge.info here!!
       // because any changes after writing the .si will be
-      // lost... 
+      // lost...
 
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", String.format(Locale.ROOT, "merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes()/1024./1024., merge.estimatedMergeBytes/1024/1024.));
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index 56eccc3..59b434f 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -434,4 +434,52 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
       }
     }
   }
+
+  /**
+   * This test makes sure we release the merge readers on abort. MDW will fail if it
+   * can't close all files
+   */
+  public void testAbortCommitMerge() throws IOException, InterruptedException {
+    try (Directory directory = newDirectory()) {
+      CountDownLatch waitForMerge = new CountDownLatch(1);
+      CountDownLatch waitForDeleteAll = new CountDownLatch(1);
+      try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
+          .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitSeconds(30)
+          .setMergeScheduler(new SerialMergeScheduler() {
+            @Override
+            public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
+              waitForMerge.countDown();
+              try {
+                waitForDeleteAll.await();
+              } catch (InterruptedException e) {
+                throw new AssertionError(e);
+              }
+              super.merge(mergeSource, trigger);
+            }
+          }))) {
+
+        Document d1 = new Document();
+        d1.add(new StringField("id", "1", Field.Store.NO));
+        Document d2 = new Document();
+        d2.add(new StringField("id", "2", Field.Store.NO));
+        Document d3 = new Document();
+        d3.add(new StringField("id", "3", Field.Store.NO));
+        writer.addDocument(d1);
+        writer.flush();
+        writer.addDocument(d2);
+        Thread t = new Thread(() -> {
+          try {
+            writer.commit();
+          } catch (IOException e) {
+            throw new AssertionError(e);
+          }
+        });
+        t.start();
+        waitForMerge.await();
+        writer.deleteAll();
+        waitForDeleteAll.countDown();
+        t.join();
+      }
+    }
+  }
 }


[lucene-solr] 01/02: Merge branch 'master' into jira/lucene-8962

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 09187e8b305c195e57ff81a174cefa548669c70f
Merge: 2aa937e f47de19
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 21:33:40 2020 +0200

    Merge branch 'master' into jira/lucene-8962

 .../java/org/apache/lucene/index/IndexWriter.java  | 48 ++++++++++++----------
 .../java/org/apache/lucene/index/MergePolicy.java  | 10 ++---
 .../org/apache/lucene/index/TestMergePolicy.java   |  1 -
 .../src/java/org/apache/solr/search/JoinQuery.java |  2 +-
 4 files changed, 32 insertions(+), 29 deletions(-)

diff --cc lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 911852a,fd17220..44d65c9
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@@ -4361,30 -4286,32 +4362,35 @@@ public class IndexWriter implements Clo
    }
  
    @SuppressWarnings("try")
 -  private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
 +  private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions, boolean droppedSegment) throws IOException {
-     final boolean drop = suppressExceptions == false;
-     try {
-       // first call mergeFinished before we potentially drop the reader and the last reference.
-       merge.mergeFinished(suppressExceptions == false, droppedSegment);
-     } finally {
-       IOUtils.applyToAll(merge.clearMergeReader(), mr -> {
-         final SegmentReader sr = mr.reader;
-         final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
-         // We still hold a ref so it should not have been removed:
-         assert rld != null;
-         if (drop) {
-           rld.dropChanges();
-         } else {
-           rld.dropMergingUpdates();
-         }
-         rld.release(sr);
-         release(rld);
-         if (drop) {
-           readerPool.drop(rld.info);
-         }
-       });
+     if (merge.hasFinished() == false) {
+       final boolean drop = suppressExceptions == false;
 -      try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
 -        IOUtils.applyToAll(merge.readers, sr -> {
++      try {
++        // first call mergeFinished before we potentially drop the reader and the last reference.
++        merge.mergeFinished(suppressExceptions == false, droppedSegment);
++      } finally {
++        IOUtils.applyToAll(merge.clearMergeReader(), mr -> {
++          final SegmentReader sr = mr.reader;
+           final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
+           // We still hold a ref so it should not have been removed:
+           assert rld != null;
+           if (drop) {
+             rld.dropChanges();
+           } else {
+             rld.dropMergingUpdates();
+           }
+           rld.release(sr);
+           release(rld);
+           if (drop) {
+             readerPool.drop(rld.info);
+           }
+         });
 -      } finally {
 -        Collections.fill(merge.readers, null);
+       }
+     } else {
 -      assert merge.readers.stream().filter(Objects::nonNull).count() == 0 : "we are done but still have readers: " + merge.readers;
++      assert merge.getMergeReader().isEmpty() : "we are done but still have readers: " + merge.getMergeReader();
+       assert suppressExceptions : "can't be done and not suppressing exceptions";
      }
 +
    }
  
    private void countSoftDeletes(CodecReader reader, Bits wrappedLiveDocs, Bits hardLiveDocs, Counter softDeleteCounter,
diff --cc lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 16decf6,91d2302..60ad004
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@@ -256,15 -254,11 +256,13 @@@ public abstract class MergePolicy 
      }
  
      /** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
 -     * @param success true iff the merge finished successfully ie. was committed */
 -    public void mergeFinished(boolean success) throws IOException {
 +     * @param success true iff the merge finished successfully ie. was committed
 +     * @param segmentDropped true iff the merged segment was dropped since it was fully deleted
 +     */
 +    public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
-       mergeCompleted.complete(success);
-       // https://issues.apache.org/jira/browse/LUCENE-9408
-       // if (mergeCompleted.complete(success) == false) {
-       //   throw new IllegalStateException("merge has already finished");
-       // }
+       if (mergeCompleted.complete(success) == false) {
+         throw new IllegalStateException("merge has already finished");
+       }
      }
  
      /** Wrap the reader in order to add/remove information to the merged segment. */