You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/24 16:50:34 UTC

[lucene-solr] 05/07: add prototype to preserve atomicity

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ff17e876c61ba5ecfa4a791527c8a73e1d3f8d5c
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 16:34:47 2020 +0200

    add prototype to preserve atomicity
---
 .../java/org/apache/lucene/index/IndexWriter.java  | 57 ++++++++++------------
 .../java/org/apache/lucene/index/MergePolicy.java  | 26 +++++++++-
 .../lucene/index/TestIndexWriterMergePolicy.java   |  4 +-
 3 files changed, 54 insertions(+), 33 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 88dc399..ff4817b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
 import java.util.function.IntPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -3245,13 +3247,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
                 SegmentInfos committingSegmentInfos = toCommit;
                 onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
                     new MergePolicy.OneMerge(toWrap.segments) {
+                      SegmentCommitInfo origInfo;
+                      AtomicBoolean onlyOnce = new AtomicBoolean(false);
                       @Override
                       public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
                         assert Thread.holdsLock(IndexWriter.this);
                         if (segmentDropped == false
                             && committed
                             && includeInCommit.get()) {
-                          deleter.incRef(info.files());
+                          deleter.incRef(origInfo.files());
                           Set<String> mergedSegmentNames = new HashSet<>();
                           for (SegmentCommitInfo sci : segments) {
                             mergedSegmentNames.add(sci.info.name);
@@ -3265,8 +3269,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
                           }
                           // Construct a OneMerge that applies to toCommit
                           MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
-                          applicableMerge.info = info.clone();
-                          long segmentCounter = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
+                          applicableMerge.info = origInfo;
+                          long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
                           committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
                           committingSegmentInfos.applyMergeChanges(applicableMerge, false);
                         }
@@ -3275,14 +3279,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
                       }
 
                       @Override
+                      void onMergeCommit() {
+                        origInfo = this.info.clone();
+                      }
+
+                      @Override
+                      void setMergeReaders(IOContext mergeContext, ReaderPool readerPool) throws IOException {
+                        if (onlyOnce.compareAndSet(false, true)) {
+                          super.setMergeReaders(mergeContext, readerPool);
+                        }
+                      }
+
+                      @Override
                       public CodecReader wrapForMerge(CodecReader reader) throws IOException {
                         return toWrap.wrapForMerge(reader);
                       }
                     }
                 ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
+                if (onCommitMerges != null) {
+                  for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+                    // TODO we need to release these readers in the case of an aborted merge
+                    merge.setMergeReaders(IOContext.DEFAULT, readerPool);
+                  }
+                }
               }
-
-
             }
             success = true;
           } finally {
@@ -3907,6 +3927,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   @SuppressWarnings("try")
   private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
 
+    merge.onMergeCommit();
     testPoint("startCommitMerge");
 
     if (tragedy.get() != null) {
@@ -4419,35 +4440,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       infoStream.message("IW", "merging " + segString(merge.segments));
     }
 
-    merge.readers = new ArrayList<>(sourceSegments.size());
-    merge.hardLiveDocs = new ArrayList<>(sourceSegments.size());
-
     // This is try/finally to make sure merger's readers are
     // closed:
     boolean success = false;
     try {
-      int segUpto = 0;
-      while(segUpto < sourceSegments.size()) {
-
-        final SegmentCommitInfo info = sourceSegments.get(segUpto);
-
-        // Hold onto the "live" reader; we will use this to
-        // commit merged deletes
-        final ReadersAndUpdates rld = getPooledInstance(info, true);
-        rld.setIsMerging();
-
-        ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
-        SegmentReader reader = mr.reader;
-
-        if (infoStream.isEnabled("IW")) {
-          infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
-        }
-
-        merge.hardLiveDocs.add(mr.hardLiveDocs);
-        merge.readers.add(reader);
-        segUpto++;
-      }
-
+      merge.setMergeReaders(context, readerPool);
       // Let the merge wrap readers
       List<CodecReader> mergeReaders = new ArrayList<>();
       Counter softDeleteCount = Counter.newCounter(false);
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 80c7b0d..8d5c456 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -33,11 +33,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import org.apache.lucene.document.Field;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.MergeInfo;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.IOSupplier;
@@ -215,8 +218,8 @@ public abstract class MergePolicy {
     // Sum of sizeInBytes of all SegmentInfos; set by IW.mergeInit
     volatile long totalMergeBytes;
 
-    List<SegmentReader> readers;        // used by IndexWriter
-    List<Bits> hardLiveDocs;        // used by IndexWriter
+    final List<SegmentReader> readers;        // used by IndexWriter
+    final List<Bits> hardLiveDocs;        // used by IndexWriter
 
     /** Segments to be merged. */
     public final List<SegmentCommitInfo> segments;
@@ -243,6 +246,9 @@ public abstract class MergePolicy {
       this.segments = List.copyOf(segments);
       totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
       mergeProgress = new OneMergeProgress();
+      hardLiveDocs = new ArrayList<>(segments.size());
+      readers = new ArrayList<>(segments.size());
+
     }
 
     /** 
@@ -403,6 +409,22 @@ public abstract class MergePolicy {
     Optional<Boolean> hasCompletedSuccessfully() {
       return Optional.ofNullable(mergeCompleted.getNow(null));
     }
+
+    void onMergeCommit() {
+    }
+
+    void setMergeReaders(IOContext mergeContext, ReaderPool readerPool) throws IOException {
+      for (final SegmentCommitInfo info : segments) {
+        // Hold onto the "live" reader; we will use this to
+        // commit merged deletes
+        final ReadersAndUpdates rld = readerPool.get(info, true);
+        rld.setIsMerging();
+        ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(mergeContext);
+        SegmentReader reader = mr.reader;
+        hardLiveDocs.add(mr.hardLiveDocs);
+        readers.add(reader);
+      }
+    }
   }
 
   /**
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index 654c6a9..1d78497 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -404,10 +404,12 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
               writer.updateDocument(new Term("id", "2"), d2);
             }
             writer.flush();
-            waitForUpdate.countDown();
           } catch (Exception e) {
             throw new AssertionError(e);
+          } finally {
+            waitForUpdate.countDown();
           }
+
         });
         t.start();
         writer.commit();