You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/25 08:55:56 UTC

[lucene-solr] branch jira/lucene-8962 updated (5c37d08 -> e0e2cf6)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a change to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 5c37d08  fix some tests
     new 206eea3  fix more tests
     new e0e2cf6  harden closing merge readers

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/lucene/index/IndexWriter.java  | 185 +++++++++++----------
 .../java/org/apache/lucene/index/MergePolicy.java  |  30 ++--
 .../src/java/org/apache/lucene/util/IOUtils.java   |  10 ++
 .../lucene/index/TestDemoParallelLeafReader.java   |   3 +-
 .../org/apache/lucene/index/TestIndexWriter.java   |   3 +-
 5 files changed, 129 insertions(+), 102 deletions(-)


[lucene-solr] 01/02: fix more tests

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 206eea3347ef9ed648fff9ba6100de7c8cea1d69
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Thu Jun 25 09:11:02 2020 +0200

    fix more tests
---
 lucene/core/src/java/org/apache/lucene/index/MergePolicy.java          | 2 +-
 .../src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java   | 3 ++-
 lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java      | 3 ++-
 3 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index d962a5b..c474a37 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -443,7 +443,7 @@ public abstract class MergePolicy {
      * Clears the list of merge readers;
      */
     List<MergeReader> clearMergeReader() {
-      assert mergeCompleted.isDone();
+      assert mergeCompleted.isDone() : this.getClass().getName();
       List<MergeReader> readers = mergeReaders;
       mergeReaders = List.of();
       return readers;
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
index 2ab2c92..a222bb7 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
@@ -530,7 +530,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
 
         @Override
         public CodecReader wrapForMerge(CodecReader reader) throws IOException {
-          LeafReader wrapped = getCurrentReader((SegmentReader)reader, schemaGen);
+          LeafReader wrapped = getCurrentReader(reader, schemaGen);
           if (wrapped instanceof ParallelLeafReader) {
             parallelReaders.add((ParallelLeafReader) wrapped);
           }
@@ -539,6 +539,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
 
         @Override
         public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+          super.mergeFinished(success, segmentDropped);
           Throwable th = null;
           for (ParallelLeafReader r : parallelReaders) {
             try {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 5ba0d80..2c9bc23 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4181,7 +4181,8 @@ public class TestIndexWriter extends LuceneTestCase {
             SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
             return new MergePolicy.OneMerge(merge.segments) {
               @Override
-              public void mergeFinished(boolean success, boolean segmentDropped) {
+              public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+                super.mergeFinished(success, segmentDropped);
                 onlyFinishOnce.set(true);
               }
             };


[lucene-solr] 02/02: harden closing merge readers

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit e0e2cf61f120209a9b0a4e898795ef4e85b7074d
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Thu Jun 25 10:55:39 2020 +0200

    harden closing merge readers
---
 .../java/org/apache/lucene/index/IndexWriter.java  | 185 +++++++++++----------
 .../java/org/apache/lucene/index/MergePolicy.java  |  30 ++--
 .../src/java/org/apache/lucene/util/IOUtils.java   |  10 ++
 3 files changed, 125 insertions(+), 100 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 1698f5f..d5b25dd 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -3243,71 +3243,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // removed the files we are now syncing.
               deleter.incRef(toCommit.files(false));
               if (anyChanges && maxCommitMergeWaitMillis > 0) {
-                SegmentInfos committingSegmentInfos = toCommit;
-                onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
-                    new MergePolicy.OneMerge(toWrap.segments) {
-                      SegmentCommitInfo origInfo;
-                      AtomicBoolean onlyOnce = new AtomicBoolean(false);
-                      @Override
-                      public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
-                        assert Thread.holdsLock(IndexWriter.this);
-                        if (segmentDropped == false
-                            && committed
-                            && includeInCommit.get()) {
-                          deleter.incRef(origInfo.files());
-                          Set<String> mergedSegmentNames = new HashSet<>();
-                          for (SegmentCommitInfo sci : segments) {
-                            mergedSegmentNames.add(sci.info.name);
-                          }
-                          List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
-                          for (SegmentCommitInfo sci : committingSegmentInfos) {
-                            if (mergedSegmentNames.contains(sci.info.name)) {
-                              toCommitMergedAwaySegments.add(sci);
-                              deleter.decRef(sci.files());
-                            }
-                          }
-                          // Construct a OneMerge that applies to toCommit
-                          MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
-                          applicableMerge.info = origInfo;
-                          long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
-                          committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
-                          committingSegmentInfos.applyMergeChanges(applicableMerge, false);
-                        }
-                        toWrap.mergeFinished(committed, false);
-                        super.mergeFinished(committed, segmentDropped);
-                      }
-
-                      @Override
-                      void onMergeCommit() {
-                        origInfo = this.info.clone();
-                      }
-
-                      @Override
-                      void initMergeReaders(IOContext mergeContext, Function<SegmentCommitInfo, ReadersAndUpdates> readerFactory) throws IOException {
-                        if (onlyOnce.compareAndSet(false, true)) {
-                          super.initMergeReaders(mergeContext, readerFactory);
-                        }
-                      }
-
-                      @Override
-                      public CodecReader wrapForMerge(CodecReader reader) throws IOException {
-                        return toWrap.wrapForMerge(reader);
-                      }
-                    }
-                ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
-                if (onCommitMerges != null) {
-                  boolean closeReaders = true;
-                  try {
-                    for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
-                      merge.initMergeReaders(IOContext.DEFAULT, sci -> getPooledInstance(sci, true));
-                    }
-                    closeReaders = false;
-                  } finally {
-                    if (closeReaders) {
-                      IOUtils.applyToAll(onCommitMerges.merges, merge -> closeMergeReaders(merge, true, false));
-                    }
-                  }
-                }
+                onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit);
               }
             }
             success = true;
@@ -3366,6 +3302,81 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
   }
 
+  private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException {
+    assert Thread.holdsLock(this);
+    MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
+        new MergePolicy.OneMerge(toWrap.segments) {
+          SegmentCommitInfo origInfo;
+          AtomicBoolean onlyOnce = new AtomicBoolean(false);
+          @Override
+          public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
+            assert Thread.holdsLock(IndexWriter.this);
+            if (segmentDropped == false
+                && committed
+                && includeInCommit.get()) {
+              deleter.incRef(origInfo.files());
+              Set<String> mergedSegmentNames = new HashSet<>();
+              for (SegmentCommitInfo sci : segments) {
+                mergedSegmentNames.add(sci.info.name);
+              }
+              List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
+              for (SegmentCommitInfo sci : committingSegmentInfos) {
+                if (mergedSegmentNames.contains(sci.info.name)) {
+                  toCommitMergedAwaySegments.add(sci);
+                  deleter.decRef(sci.files());
+                }
+              }
+              // Construct a OneMerge that applies to toCommit
+              MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
+              applicableMerge.info = origInfo;
+              long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
+              committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
+              committingSegmentInfos.applyMergeChanges(applicableMerge, false);
+            }
+            toWrap.mergeFinished(committed, false);
+            super.mergeFinished(committed, segmentDropped);
+          }
+
+          @Override
+          void onMergeCommit() {
+            origInfo = this.info.clone();
+          }
+
+          @Override
+          void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergePolicy.MergeReader> readerFactory) throws IOException {
+            if (onlyOnce.compareAndSet(false, true)) {
+              super.initMergeReaders(readerFactory);
+            }
+          }
+
+          @Override
+          public CodecReader wrapForMerge(CodecReader reader) throws IOException {
+            return toWrap.wrapForMerge(reader);
+          }
+        }
+    ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
+    if (onCommitMerges != null) {
+      boolean closeReaders = true;
+      try {
+        for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+          IOContext context = new IOContext(merge.getStoreMergeInfo());
+          merge.initMergeReaders(
+              sci -> {
+                final ReadersAndUpdates rld = getPooledInstance(sci, true);
+                rld.setIsMerging();
+                return rld.getReaderForMerge(context);
+              });
+        }
+        closeReaders = false;
+      } finally {
+        if (closeReaders) {
+          IOUtils.applyToAll(onCommitMerges.merges, merge -> closeMergeReaders(merge, true, false));
+        }
+      }
+    }
+    return onCommitMerges;
+  }
+
   /**
    * Ensures that all changes in the reader-pool are written to disk.
    * @param writeDeletes if <code>true</code> if deletes should be written to disk too.
@@ -4378,27 +4389,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions, boolean droppedSegment) throws IOException {
     if (merge.hasFinished() == false) {
       final boolean drop = suppressExceptions == false;
-      try {
-        // first call mergeFinished before we potentially drop the reader and the last reference.
-        merge.mergeFinished(suppressExceptions == false, droppedSegment);
-      } finally {
-        IOUtils.applyToAll(merge.clearMergeReader(), mr -> {
-          final SegmentReader sr = mr.reader;
-          final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
-          // We still hold a ref so it should not have been removed:
-          assert rld != null;
-          if (drop) {
-            rld.dropChanges();
-          } else {
-            rld.dropMergingUpdates();
-          }
-          rld.release(sr);
-          release(rld);
-          if (drop) {
-            readerPool.drop(rld.info);
-          }
-        });
-      }
+      // first call mergeFinished before we potentially drop the reader and the last reference.
+      merge.close(suppressExceptions == false, droppedSegment, mr -> {
+        final SegmentReader sr = mr.reader;
+        final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
+        // We still hold a ref so it should not have been removed:
+        assert rld != null;
+        if (drop) {
+          rld.dropChanges();
+        } else {
+          rld.dropMergingUpdates();
+        }
+        rld.release(sr);
+        release(rld);
+        if (drop) {
+          readerPool.drop(rld.info);
+        }
+      });
     } else {
       assert merge.getMergeReader().isEmpty() : "we are done but still have readers: " + merge.getMergeReader();
       assert suppressExceptions : "can't be done and not suppressing exceptions";
@@ -4456,7 +4463,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     // closed:
     boolean success = false;
     try {
-      merge.initMergeReaders(context, sci -> getPooledInstance(sci, true));
+      merge.initMergeReaders(sci -> {
+        final ReadersAndUpdates rld = getPooledInstance(sci, true);
+        rld.setIsMerging();
+        return rld.getReaderForMerge(context);
+      });
       // Let the merge wrap readers
       List<CodecReader> mergeReaders = new ArrayList<>();
       Counter softDeleteCount = Counter.newCounter(false);
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index c474a37..06f967d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -43,6 +43,7 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.MergeInfo;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.IOSupplier;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.ThreadInterruptedException;
 
@@ -260,9 +261,23 @@ public abstract class MergePolicy {
      * @param segmentDropped true iff the merged segment was dropped since it was fully deleted
      */
     public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+    }
+
+    /**
+     * Closes this merge and releases all merge readers
+     */
+    final void close(boolean success, boolean segmentDropped, IOUtils.IOConsumer<MergeReader> readerConsumer) throws IOException {
+      // this method is final to ensure we never miss a super call to cleanup and finish the merge
       if (mergeCompleted.complete(success) == false) {
         throw new IllegalStateException("merge has already finished");
       }
+      try {
+        mergeFinished(success, segmentDropped);
+      } finally {
+        List<MergeReader> readers = mergeReaders;
+        IOUtils.applyToAll(readers, readerConsumer);
+        mergeReaders = List.of();
+      }
     }
 
     /** Wrap the reader in order to add/remove information to the merged segment. */
@@ -414,7 +429,7 @@ public abstract class MergePolicy {
     /**
      * Sets the merge readers for this merge.
      */
-    void initMergeReaders(IOContext mergeContext, Function<SegmentCommitInfo, ReadersAndUpdates> readerFactory) throws IOException {
+    void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergeReader> readerFactory) throws IOException {
       assert mergeReaders.isEmpty() : "merge readers must be empty";
       assert mergeCompleted.isDone() == false : "merge is already done";
       ArrayList<MergeReader> readers = new ArrayList<>(segments.size());
@@ -422,9 +437,7 @@ public abstract class MergePolicy {
         for (final SegmentCommitInfo info : segments) {
           // Hold onto the "live" reader; we will use this to
           // commit merged deletes
-          final ReadersAndUpdates rld = readerFactory.apply(info);
-          rld.setIsMerging();
-          readers.add(rld.getReaderForMerge(mergeContext));
+          readers.add(readerFactory.apply(info));
         }
       } finally {
         // ensure we assign this to close them in the case of an exception
@@ -439,15 +452,6 @@ public abstract class MergePolicy {
       return mergeReaders;
     }
 
-    /**
-     * Clears the list of merge readers;
-     */
-    List<MergeReader> clearMergeReader() {
-      assert mergeCompleted.isDone() : this.getClass().getName();
-      List<MergeReader> readers = mergeReaders;
-      mergeReaders = List.of();
-      return readers;
-    }
   }
 
   /**
diff --git a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
index 80182bf..32c9c74 100644
--- a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
+++ b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
@@ -656,4 +656,14 @@ public final class IOUtils {
      */
     void accept(T input) throws IOException;
   }
+
+  /**
+   * A Function that may throw an IOException
+   * @see java.util.function.Function
+   */
+  @FunctionalInterface
+  public interface IOFunction<T, R> {
+    R apply(T t) throws IOException;
+  }
+
 }