You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/24 16:50:34 UTC
[lucene-solr] 05/07: add prototype to preserve atomicity
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit ff17e876c61ba5ecfa4a791527c8a73e1d3f8d5c
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 24 16:34:47 2020 +0200
add prototype to preserve atomicity
---
.../java/org/apache/lucene/index/IndexWriter.java | 57 ++++++++++------------
.../java/org/apache/lucene/index/MergePolicy.java | 26 +++++++++-
.../lucene/index/TestIndexWriterMergePolicy.java | 4 +-
3 files changed, 54 insertions(+), 33 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 88dc399..ff4817b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -3245,13 +3247,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos committingSegmentInfos = toCommit;
onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
+ SegmentCommitInfo origInfo;
+ AtomicBoolean onlyOnce = new AtomicBoolean(false);
@Override
public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
if (segmentDropped == false
&& committed
&& includeInCommit.get()) {
- deleter.incRef(info.files());
+ deleter.incRef(origInfo.files());
Set<String> mergedSegmentNames = new HashSet<>();
for (SegmentCommitInfo sci : segments) {
mergedSegmentNames.add(sci.info.name);
@@ -3265,8 +3269,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
// Construct a OneMerge that applies to toCommit
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
- applicableMerge.info = info.clone();
- long segmentCounter = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
+ applicableMerge.info = origInfo;
+ long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
committingSegmentInfos.applyMergeChanges(applicableMerge, false);
}
@@ -3275,14 +3279,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
@Override
+ void onMergeCommit() {
+ origInfo = this.info.clone();
+ }
+
+ @Override
+ void setMergeReaders(IOContext mergeContext, ReaderPool readerPool) throws IOException {
+ if (onlyOnce.compareAndSet(false, true)) {
+ super.setMergeReaders(mergeContext, readerPool);
+ }
+ }
+
+ @Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return toWrap.wrapForMerge(reader);
}
}
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ if (onCommitMerges != null) {
+ for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+ // TODO we need to release these readers in the case of an aborted merge
+ merge.setMergeReaders(IOContext.DEFAULT, readerPool);
+ }
+ }
}
-
-
}
success = true;
} finally {
@@ -3907,6 +3927,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
@SuppressWarnings("try")
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
+ merge.onMergeCommit();
testPoint("startCommitMerge");
if (tragedy.get() != null) {
@@ -4419,35 +4440,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", "merging " + segString(merge.segments));
}
- merge.readers = new ArrayList<>(sourceSegments.size());
- merge.hardLiveDocs = new ArrayList<>(sourceSegments.size());
-
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
try {
- int segUpto = 0;
- while(segUpto < sourceSegments.size()) {
-
- final SegmentCommitInfo info = sourceSegments.get(segUpto);
-
- // Hold onto the "live" reader; we will use this to
- // commit merged deletes
- final ReadersAndUpdates rld = getPooledInstance(info, true);
- rld.setIsMerging();
-
- ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
- SegmentReader reader = mr.reader;
-
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
- }
-
- merge.hardLiveDocs.add(mr.hardLiveDocs);
- merge.readers.add(reader);
- segUpto++;
- }
-
+ merge.setMergeReaders(context, readerPool);
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();
Counter softDeleteCount = Counter.newCounter(false);
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 80c7b0d..8d5c456 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -33,11 +33,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
@@ -215,8 +218,8 @@ public abstract class MergePolicy {
// Sum of sizeInBytes of all SegmentInfos; set by IW.mergeInit
volatile long totalMergeBytes;
- List<SegmentReader> readers; // used by IndexWriter
- List<Bits> hardLiveDocs; // used by IndexWriter
+ final List<SegmentReader> readers; // used by IndexWriter
+ final List<Bits> hardLiveDocs; // used by IndexWriter
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
@@ -243,6 +246,9 @@ public abstract class MergePolicy {
this.segments = List.copyOf(segments);
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
+ hardLiveDocs = new ArrayList<>(segments.size());
+ readers = new ArrayList<>(segments.size());
+
}
/**
@@ -403,6 +409,22 @@ public abstract class MergePolicy {
Optional<Boolean> hasCompletedSuccessfully() {
return Optional.ofNullable(mergeCompleted.getNow(null));
}
+
+ void onMergeCommit() {
+ }
+
+ void setMergeReaders(IOContext mergeContext, ReaderPool readerPool) throws IOException {
+ for (final SegmentCommitInfo info : segments) {
+ // Hold onto the "live" reader; we will use this to
+ // commit merged deletes
+ final ReadersAndUpdates rld = readerPool.get(info, true);
+ rld.setIsMerging();
+ ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(mergeContext);
+ SegmentReader reader = mr.reader;
+ hardLiveDocs.add(mr.hardLiveDocs);
+ readers.add(reader);
+ }
+ }
}
/**
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index 654c6a9..1d78497 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -404,10 +404,12 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
writer.updateDocument(new Term("id", "2"), d2);
}
writer.flush();
- waitForUpdate.countDown();
} catch (Exception e) {
throw new AssertionError(e);
+ } finally {
+ waitForUpdate.countDown();
}
+
});
t.start();
writer.commit();