You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/27 20:39:09 UTC
[lucene-solr] branch branch_8x updated: LUCENE-8962: Merge small
segments on commit (#1617)
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 9b1c928 LUCENE-8962: Merge small segments on commit (#1617)
9b1c928 is described below
commit 9b1c92809e730a70c1d9bf9e5b65a4b521bddc8b
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Sat Jun 27 22:25:45 2020 +0200
LUCENE-8962: Merge small segments on commit (#1617)
Add IndexWriter merge-on-commit feature to selectively merge small segments on commit,
subject to a configurable timeout, to improve search performance by reducing the number of small
segments for searching.
Co-authored-by: Michael Froh <ms...@apache.org>
Co-authored-by: Michael Sokolov <so...@falutin.net>
Co-authored-by: Mike McCandless <mi...@apache.org>
---
lucene/CHANGES.txt | 6 +-
.../org/apache/lucene/index/FilterMergePolicy.java | 5 +
.../java/org/apache/lucene/index/IndexWriter.java | 280 +++++++++++++++------
.../org/apache/lucene/index/IndexWriterConfig.java | 18 ++
.../apache/lucene/index/LiveIndexWriterConfig.java | 13 +
.../java/org/apache/lucene/index/MergePolicy.java | 98 +++++++-
.../java/org/apache/lucene/index/MergeTrigger.java | 7 +-
.../org/apache/lucene/index/NoMergePolicy.java | 3 +
.../lucene/index/OneMergeWrappingMergePolicy.java | 5 +
.../org/apache/lucene/index/ReadersAndUpdates.java | 14 +-
.../src/java/org/apache/lucene/util/IOUtils.java | 10 +
.../lucene/index/TestDemoParallelLeafReader.java | 5 +-
.../org/apache/lucene/index/TestIndexWriter.java | 169 ++++++++-----
.../lucene/index/TestIndexWriterMergePolicy.java | 240 +++++++++++++++++-
.../org/apache/lucene/index/TestMergePolicy.java | 10 +-
.../lucene/search/TestPhraseWildcardQuery.java | 7 +-
.../apache/lucene/index/MockRandomMergePolicy.java | 32 +++
.../org/apache/lucene/util/LuceneTestCase.java | 1 +
18 files changed, 750 insertions(+), 173 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index f78e0fe..02ea4ba 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -44,6 +44,10 @@ New Features
* LUCENE-7889: Grouping by range based on values from DoubleValuesSource and LongValuesSource
(Alan Woodward)
+* LUCENE-8962: Add IndexWriter merge-on-commit feature to selectively merge small segments on commit,
+ subject to a configurable timeout, to improve search performance by reducing the number of small
+ segments for searching (Michael Froh, Mike Sokolov, Mike Mccandless, Simon Willnauer)
+
Improvements
---------------------
* LUCENE-9276: Use same code-path for updateDocuments and updateDocument in IndexWriter and
@@ -270,8 +274,6 @@ Improvements
* LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim)
-* LUCENE-8962: Add ability to selectively merge on commit (Michael Froh)
-
* LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively
boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward)
diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
index eb634b4..b4e33f8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
@@ -58,6 +58,11 @@ public class FilterMergePolicy extends MergePolicy {
}
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
+ }
+
+ @Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {
return in.useCompoundFile(infos, mergedInfo, mergeContext);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 990f4ce..cb3cd22 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -29,11 +29,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -2184,7 +2184,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
} else {
- spec = mergePolicy.findMerges(trigger, segmentInfos, this);
+ switch (trigger) {
+ case COMMIT:
+ spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
+ break;
+ default:
+ spec = mergePolicy.findMerges(trigger, segmentInfos, this);
+ }
}
if (spec != null) {
final int numMerges = spec.merges.size();
@@ -2479,15 +2485,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Aborts running merges. Be careful when using this
* method: when you abort a long-running merge, you lose
* a lot of work that must later be redone. */
- private synchronized void abortMerges() {
+ private synchronized void abortMerges() throws IOException {
// Abort all pending & running merges:
- for (final MergePolicy.OneMerge merge : pendingMerges) {
+ IOUtils.applyToAll(pendingMerges, merge -> {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
- merge.setAborted();
+ abortOneMerge(merge);
mergeFinish(merge);
- }
+ });
pendingMerges.clear();
for (final MergePolicy.OneMerge merge : runningMerges) {
@@ -3190,7 +3196,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;
-
+ MergePolicy.MergeSpecification onCommitMerges = null;
+ AtomicBoolean includeInCommit = new AtomicBoolean(true);
+ final long maxCommitMergeWaitMillis = config.getMaxCommitMergeWaitMillis();
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
@@ -3243,16 +3251,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = segmentInfos.clone();
-
pendingCommitChangeCount = changeCount.get();
-
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
- // removed the files we are now syncing.
- filesToCommit = toCommit.files(false);
- deleter.incRef(filesToCommit);
+ // removed the files we are now syncing.
+ deleter.incRef(toCommit.files(false));
+ if (anyChanges && maxCommitMergeWaitMillis > 0) {
+ // we can safely call prepareOnCommitMerge since writeReaderPool(true) above wrote all
+ // necessary files to disk and checkpointed them.
+ onCommitMerges = prepareOnCommitMerge(toCommit, includeInCommit);
+ }
}
success = true;
} finally {
@@ -3273,7 +3283,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} finally {
maybeCloseOnTragicEvent();
}
-
+
+ if (onCommitMerges != null) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "now run merges during commit: " + onCommitMerges.segString(directory));
+ }
+ mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
+ onCommitMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS);
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "done waiting for merges during commit");
+ }
+ synchronized (this) {
+ // we need to call this under lock since mergeFinished above is also called under the IW lock
+ includeInCommit.set(false);
+ }
+ }
+ // do this after handling any onCommitMerges since the files will have changed if any merges
+ // did complete
+ filesToCommit = toCommit.files(false);
try {
if (anyChanges) {
maybeMerge.set(true);
@@ -3302,6 +3329,120 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
/**
+ * This optimization allows a commit to wait for merges on smallish segments to
+ * reduce the eventual number of tiny segments in the commit point. We wrap a {@code OneMerge} to
+ * update the {@code committingSegmentInfos} once the merge has finished. We replace the source segments
+ * in the SIS that we are going to commit with the freshly merged segment, but ignore all deletions and updates
+ * that are made to documents in the merged segment while it was merging. The updates that are made do not belong to
+ * the point-in-time commit point and should therefore not be included. See the clone call in {@code onMergeComplete}
+ * below. We also ensure that we pull the merge readers while holding {@code IndexWriter}'s lock. Otherwise
+ * we could see concurrent deletions/updates applied that do not belong to the segment.
+ */
+ private MergePolicy.MergeSpecification prepareOnCommitMerge(SegmentInfos committingSegmentInfos, AtomicBoolean includeInCommit) throws IOException {
+ assert Thread.holdsLock(this);
+ MergePolicy.MergeSpecification onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
+ new MergePolicy.OneMerge(toWrap.segments) {
+ SegmentCommitInfo origInfo;
+ AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+ @Override
+ public void mergeFinished(boolean committed, boolean segmentDropped) throws IOException {
+ assert Thread.holdsLock(IndexWriter.this);
+
+ // includedInCommit will be set (above, by our caller) to false if the allowed max wall clock
+ // time (IWC.getMaxCommitMergeWaitMillis()) has elapsed, which means we did not make the timeout
+ // and will not commit our merge to the to-be-commited SegmentInfos
+
+ if (segmentDropped == false
+ && committed
+ && includeInCommit.get()) {
+
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "now apply merge during commit: " + toWrap.segString());
+ }
+
+ // make sure onMergeComplete really was called:
+ assert origInfo != null;
+
+ deleter.incRef(origInfo.files());
+ Set<String> mergedSegmentNames = new HashSet<>();
+ for (SegmentCommitInfo sci : segments) {
+ mergedSegmentNames.add(sci.info.name);
+ }
+ List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : committingSegmentInfos) {
+ if (mergedSegmentNames.contains(sci.info.name)) {
+ toCommitMergedAwaySegments.add(sci);
+ deleter.decRef(sci.files());
+ }
+ }
+ // Construct a OneMerge that applies to toCommit
+ MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
+ applicableMerge.info = origInfo;
+ long segmentCounter = Long.parseLong(origInfo.info.name.substring(1), Character.MAX_RADIX);
+ committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
+ committingSegmentInfos.applyMergeChanges(applicableMerge, false);
+ } else {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "skip apply merge during commit: " + toWrap.segString());
+ }
+ }
+ toWrap.mergeFinished(committed, false);
+ super.mergeFinished(committed, segmentDropped);
+ }
+
+ @Override
+ void onMergeComplete() {
+ // clone the target info to make sure we have the original info without the updated del and update gens
+ origInfo = info.clone();
+ }
+
+ @Override
+ void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergePolicy.MergeReader> readerFactory) throws IOException {
+ if (onlyOnce.compareAndSet(false, true)) {
+ // we do this only once below to pull readers as point in time readers with respect to the commit point
+ // we try to update
+ super.initMergeReaders(readerFactory);
+ }
+ }
+
+ @Override
+ public CodecReader wrapForMerge(CodecReader reader) throws IOException {
+ return toWrap.wrapForMerge(reader); // must delegate
+ }
+ }
+ ), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ if (onCommitMerges != null) {
+ boolean closeReaders = true;
+ try {
+ for (MergePolicy.OneMerge merge : onCommitMerges.merges) {
+ IOContext context = new IOContext(merge.getStoreMergeInfo());
+ merge.initMergeReaders(
+ sci -> {
+ final ReadersAndUpdates rld = getPooledInstance(sci, true);
+ // calling setIsMerging is important since it causes the RaU to record all DV updates
+ // in a separate map in order to be applied to the merged segment after it's done
+ rld.setIsMerging();
+ return rld.getReaderForMerge(context);
+ });
+ }
+ closeReaders = false;
+ } finally {
+ if (closeReaders) {
+ IOUtils.applyToAll(onCommitMerges.merges, merge -> {
+ // that merge is broken we need to clean up after it - it's fine we still have the IW lock to do this
+ boolean removed = pendingMerges.remove(merge);
+ assert removed: "merge should be pending but isn't: " + merge.segString();
+ abortOneMerge(merge);
+ mergeFinish(merge);
+ });
+ }
+ }
+ }
+ return onCommitMerges;
+ }
+
+ /**
* Ensures that all changes in the reader-pool are written to disk.
* @param writeDeletes if <code>true</code> if deletes should be written to disk too.
*/
@@ -3714,7 +3855,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
MergeState.DocMap segDocMap = mergeState.docMaps[i];
MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i];
- carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.hardLiveDocs.get(i), rld.getHardLiveDocs(),
+ carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.getMergeReader().get(i).hardLiveDocs, rld.getHardLiveDocs(),
segDocMap, segLeafDocMap);
// Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs.
@@ -3867,7 +4008,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
@SuppressWarnings("try")
private synchronized boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
-
+ merge.onMergeComplete();
testPoint("startCommitMerge");
if (tragedy.get() != null) {
@@ -3981,7 +4122,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge
// readers:
- closeMergeReaders(merge, false);
+ closeMergeReaders(merge, false, dropSegment);
}
if (infoStream.isEnabled("IW")) {
@@ -4043,11 +4184,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
try {
try {
mergeInit(merge);
-
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());
}
-
mergeMiddle(merge, mergePolicy);
mergeSuccess(merge);
success = true;
@@ -4056,7 +4195,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
} finally {
synchronized(this) {
-
+ // Readers are already closed in commitMerge if we didn't hit
+ // an exc:
+ if (success == false) {
+ closeMergeReaders(merge, true, false);
+ }
mergeFinish(merge);
if (success == false) {
@@ -4088,6 +4231,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Hook that's called when the specified merge is complete. */
protected void mergeSuccess(MergePolicy.OneMerge merge) {}
+ private void abortOneMerge(MergePolicy.OneMerge merge) throws IOException {
+ merge.setAborted();
+ closeMergeReaders(merge, true, false);
+ }
+
/** Checks whether this merge involves any segments
* already participating in a merge. If not, this merge
* is "registered", meaning we record that its segments
@@ -4102,7 +4250,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
assert merge.segments.size() > 0;
if (stopMerges) {
- merge.setAborted();
+ abortOneMerge(merge);
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
}
@@ -4303,30 +4451,28 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
@SuppressWarnings("try")
- private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
+ private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions, boolean droppedSegment) throws IOException {
if (merge.hasFinished() == false) {
final boolean drop = suppressExceptions == false;
- try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
- IOUtils.applyToAll(merge.readers, sr -> {
- final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
- // We still hold a ref so it should not have been removed:
- assert rld != null;
- if (drop) {
- rld.dropChanges();
- } else {
- rld.dropMergingUpdates();
- }
- rld.release(sr);
- release(rld);
- if (drop) {
- readerPool.drop(rld.info);
- }
- });
- } finally {
- Collections.fill(merge.readers, null);
- }
+ // first call mergeFinished before we potentially drop the reader and the last reference.
+ merge.close(suppressExceptions == false, droppedSegment, mr -> {
+ final SegmentReader sr = mr.reader;
+ final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
+ // We still hold a ref so it should not have been removed:
+ assert rld != null;
+ if (drop) {
+ rld.dropChanges();
+ } else {
+ rld.dropMergingUpdates();
+ }
+ rld.release(sr);
+ release(rld);
+ if (drop) {
+ readerPool.drop(rld.info);
+ }
+ });
} else {
- assert merge.readers.stream().filter(Objects::nonNull).count() == 0 : "we are done but still have readers: " + merge.readers;
+ assert merge.getMergeReader().isEmpty() : "we are done but still have readers: " + merge.getMergeReader();
assert suppressExceptions : "can't be done and not suppressing exceptions";
}
}
@@ -4369,8 +4515,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
merge.checkAborted();
Directory mergeDirectory = mergeScheduler.wrapForMerge(merge, directory);
- List<SegmentCommitInfo> sourceSegments = merge.segments;
-
IOContext context = new IOContext(merge.getStoreMergeInfo());
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
@@ -4379,45 +4523,25 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
infoStream.message("IW", "merging " + segString(merge.segments));
}
- merge.readers = new ArrayList<>(sourceSegments.size());
- merge.hardLiveDocs = new ArrayList<>(sourceSegments.size());
-
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
try {
- int segUpto = 0;
- while(segUpto < sourceSegments.size()) {
-
- final SegmentCommitInfo info = sourceSegments.get(segUpto);
-
- // Hold onto the "live" reader; we will use this to
- // commit merged deletes
- final ReadersAndUpdates rld = getPooledInstance(info, true);
+ merge.initMergeReaders(sci -> {
+ final ReadersAndUpdates rld = getPooledInstance(sci, true);
rld.setIsMerging();
-
- ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
- SegmentReader reader = mr.reader;
-
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
- }
-
- merge.hardLiveDocs.add(mr.hardLiveDocs);
- merge.readers.add(reader);
- segUpto++;
- }
-
+ return rld.getReaderForMerge(context);
+ });
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();
Counter softDeleteCount = Counter.newCounter(false);
- for (int r = 0; r < merge.readers.size(); r++) {
- SegmentReader reader = merge.readers.get(r);
+ for (MergePolicy.MergeReader mergeReader : merge.getMergeReader()) {
+ SegmentReader reader = mergeReader.reader;
CodecReader wrappedReader = merge.wrapForMerge(reader);
validateMergeReader(wrappedReader);
if (softDeletesEnabled) {
if (reader != wrappedReader) { // if we don't have a wrapped reader we won't preserve any soft-deletes
- Bits hardLiveDocs = merge.hardLiveDocs.get(r);
+ Bits hardLiveDocs = mergeReader.hardLiveDocs;
if (hardLiveDocs != null) { // we only need to do this accounting if we have mixed deletes
Bits wrappedLiveDocs = wrappedReader.getLiveDocs();
Counter hardDeleteCounter = Counter.newCounter(false);
@@ -4450,7 +4574,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
final SegmentMerger merger = new SegmentMerger(mergeReaders,
merge.info.info, infoStream, dirWrapper,
- globalFieldNumberMap,
+ globalFieldNumberMap,
context);
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();
@@ -4471,8 +4595,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
.stream()
.filter((e) -> e.getValue() > 0)
- .map((e) -> String.format(Locale.ROOT, "%.1f sec %s",
- e.getValue() / 1000000000.,
+ .map((e) -> String.format(Locale.ROOT, "%.1f sec %s",
+ e.getValue() / 1000000000.,
e.getKey().name().toLowerCase(Locale.ROOT)))
.collect(Collectors.joining(", "));
if (!pauseInfo.isEmpty()) {
@@ -4484,9 +4608,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
(mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
- (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
- (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
- (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
+ (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
+ (mergeState.mergeFieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
+ (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
(mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " +
String.format(Locale.ROOT,
@@ -4596,7 +4720,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// TODO: ideally we would freeze merge.info here!!
// because any changes after writing the .si will be
- // lost...
+ // lost...
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", String.format(Locale.ROOT, "merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes()/1024./1024., merge.estimatedMergeBytes/1024/1024.));
@@ -4628,7 +4752,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Readers are already closed in commitMerge if we didn't hit
// an exc:
if (success == false) {
- closeMergeReaders(merge, true);
+ closeMergeReaders(merge, true, false);
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 26e7e3d..6dcdf83 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -109,6 +109,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
+
+ /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
+ public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
@@ -460,6 +463,21 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
}
/**
+ * Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by
+ * MergePolicy.findFullFlushMerges(...).
+ * If this time is reached, we proceed with the commit based on segments merged up to that point.
+ * The merges are not cancelled, and will still run to completion independent of the commit,
+ * like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
+ *
+ * Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
+ * has an implementation that actually returns merges which by default doesn't return any merges.
+ */
+ public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
+ this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
+ return this;
+ }
+
+ /**
* Set the {@link Sort} order to use for all (flushed and merged) segments.
*/
public IndexWriterConfig setIndexSort(Sort sort) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 1f48acc..1450331 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -109,6 +109,8 @@ public class LiveIndexWriterConfig {
/** soft deletes field */
protected String softDeletesField = null;
+ /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
+ protected volatile long maxCommitMergeWaitMillis;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@@ -132,6 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
+ maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
}
/** Returns the default analyzer to use for indexing documents. */
@@ -461,6 +464,15 @@ public class LiveIndexWriterConfig {
return softDeletesField;
}
+ /**
+ * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
+ * If this time is reached, we proceed with the commit based on segments merged up to that point.
+ * The merges are not cancelled, and may still run to completion independent of the commit.
+ */
+ public long getMaxCommitMergeWaitMillis() {
+ return maxCommitMergeWaitMillis;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -484,6 +496,7 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
+ sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
return sb.toString();
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 0943c4b..b66c73a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
+import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -215,8 +217,7 @@ public abstract class MergePolicy {
// Sum of sizeInBytes of all SegmentInfos; set by IW.mergeInit
volatile long totalMergeBytes;
- List<SegmentReader> readers; // used by IndexWriter
- List<Bits> hardLiveDocs; // used by IndexWriter
+ private List<MergeReader> mergeReaders; // used by IndexWriter
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
@@ -243,6 +244,7 @@ public abstract class MergePolicy {
this.segments = new ArrayList<>(segments);
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
+ mergeReaders = Collections.emptyList();
}
/**
@@ -254,11 +256,27 @@ public abstract class MergePolicy {
}
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
- * @param success true iff the merge finished successfully ie. was committed */
- public void mergeFinished(boolean success) throws IOException {
+ * @param success true iff the merge finished successfully ie. was committed
+ * @param segmentDropped true iff the merged segment was dropped since it was fully deleted
+ */
+ public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+ }
+
+ /**
+ * Closes this merge and releases all merge readers
+ */
+ final void close(boolean success, boolean segmentDropped, IOUtils.IOConsumer<MergeReader> readerConsumer) throws IOException {
+ // this method is final to ensure we never miss a super call to cleanup and finish the merge
if (mergeCompleted.complete(success) == false) {
throw new IllegalStateException("merge has already finished");
}
+ try {
+ mergeFinished(success, segmentDropped);
+ } finally {
+ final List<MergeReader> readers = mergeReaders;
+ mergeReaders = Collections.emptyList();
+ IOUtils.applyToAll(readers, readerConsumer);
+ }
}
/** Wrap the reader in order to add/remove information to the merged segment. */
@@ -399,6 +417,40 @@ public abstract class MergePolicy {
Optional<Boolean> hasCompletedSuccessfully() {
return Optional.ofNullable(mergeCompleted.getNow(null));
}
+
+
+ /**
+ * Called just before the merge is applied to IndexWriter's SegmentInfos
+ */
+ void onMergeComplete() {
+ }
+
+ /**
+ * Sets the merge readers for this merge.
+ */
+ void initMergeReaders(IOUtils.IOFunction<SegmentCommitInfo, MergeReader> readerFactory) throws IOException {
+ assert mergeReaders.isEmpty() : "merge readers must be empty";
+ assert mergeCompleted.isDone() == false : "merge is already done";
+ final ArrayList<MergeReader> readers = new ArrayList<>(segments.size());
+ try {
+ for (final SegmentCommitInfo info : segments) {
+ // Hold onto the "live" reader; we will use this to
+ // commit merged deletes
+ readers.add(readerFactory.apply(info));
+ }
+ } finally {
+ // ensure we assign this to close them in the case of an exception
+ this.mergeReaders = Collections.unmodifiableList(readers);
+ }
+ }
+
+ /**
+ * Returns the merge readers or an empty list if the readers were not initialized yet.
+ */
+ List<MergeReader> getMergeReader() {
+ return mergeReaders;
+ }
+
}
/**
@@ -553,7 +605,7 @@ public abstract class MergePolicy {
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
- * @param mergeContext the IndexWriter to find the merges on
+ * @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@@ -564,12 +616,36 @@ public abstract class MergePolicy {
* deletes from the index.
* @param segmentInfos
* the total set of segments in the index
- * @param mergeContext the IndexWriter to find the merges on
+ * @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
/**
+ * Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit.
+ * If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using
+ * {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}.
+ *
+ * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
+ * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be
+ * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
+ * the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
+ * apply to future commits, but will not be reflected in the current commit.
+ *
+ * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
+ * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
+ * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
+ *
+ * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
+ * @param segmentInfos the total set of segments in the index (while preparing the commit)
+ * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
+ * already in a registered merge (see {@link MergeContext#getMergingSegments()}).
+ */
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return null;
+ }
+
+ /**
* Returns true if a new segment (regardless of its origin) should use the
* compound file format. The default implementation returns <code>true</code>
* iff the size of the given mergedInfo is less or equal to
@@ -745,4 +821,14 @@ public abstract class MergePolicy {
*/
Set<SegmentCommitInfo> getMergingSegments();
}
+
+ final static class MergeReader {
+ final SegmentReader reader;
+ final Bits hardLiveDocs;
+
+ MergeReader(SegmentReader reader, Bits hardLiveDocs) {
+ this.reader = reader;
+ this.hardLiveDocs = hardLiveDocs;
+ }
+ }
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
index d165a27..01a6b15 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
@@ -47,5 +47,10 @@ public enum MergeTrigger {
/**
* Merge was triggered by a closing IndexWriter.
*/
- CLOSING
+ CLOSING,
+
+ /**
+ * Merge was triggered on commit.
+ */
+ COMMIT,
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
index 1480ce4..b209e8ae 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
@@ -46,6 +46,9 @@ public final class NoMergePolicy extends MergePolicy {
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
+
+ @Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return newSegment.info.getUseCompoundFile();
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
index d08711e..a5fd66a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
@@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
+ }
+
private MergeSpecification wrapSpec(MergeSpecification spec) {
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
if (wrapped != null) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
index b0ee8d68..505f08f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
@@ -695,18 +695,8 @@ final class ReadersAndUpdates {
return isMerging;
}
- final static class MergeReader {
- final SegmentReader reader;
- final Bits hardLiveDocs;
-
- MergeReader(SegmentReader reader, Bits hardLiveDocs) {
- this.reader = reader;
- this.hardLiveDocs = hardLiveDocs;
- }
- }
-
/** Returns a reader for merge, with the latest doc values updates and deletions. */
- synchronized MergeReader getReaderForMerge(IOContext context) throws IOException {
+ synchronized MergePolicy.MergeReader getReaderForMerge(IOContext context) throws IOException {
// We must carry over any still-pending DV updates because they were not
// successfully written, e.g. because there was a hole in the delGens,
@@ -728,7 +718,7 @@ final class ReadersAndUpdates {
reader = createNewReaderWithLatestLiveDocs(reader);
}
assert pendingDeletes.verifyDocCounts(reader);
- return new MergeReader(reader, pendingDeletes.getHardLiveDocs());
+ return new MergePolicy.MergeReader(reader, pendingDeletes.getHardLiveDocs());
}
/**
diff --git a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
index b4142df..b34b829 100644
--- a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
+++ b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
@@ -656,4 +656,14 @@ public final class IOUtils {
*/
void accept(T input) throws IOException;
}
+
+ /**
+ * A Function that may throw an IOException
+ * @see java.util.function.Function
+ */
+ @FunctionalInterface
+ public interface IOFunction<T, R> {
+ R apply(T t) throws IOException;
+ }
+
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
index 7fdad3b..a222bb7 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
@@ -530,7 +530,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
- LeafReader wrapped = getCurrentReader((SegmentReader)reader, schemaGen);
+ LeafReader wrapped = getCurrentReader(reader, schemaGen);
if (wrapped instanceof ParallelLeafReader) {
parallelReaders.add((ParallelLeafReader) wrapped);
}
@@ -538,7 +538,8 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
}
@Override
- public void mergeFinished(boolean success) throws IOException {
+ public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+ super.mergeFinished(success, segmentDropped);
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index c4331c7..75ad3e3 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -345,7 +345,7 @@ public class TestIndexWriter extends LuceneTestCase {
// Make sure it's OK to change RAM buffer size and
// maxBufferedDocs in a write session
public void testChangingRAMBuffer() throws IOException {
- Directory dir = newDirectory();
+ Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
writer.getConfig().setMaxBufferedDocs(10);
writer.getConfig().setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
@@ -608,7 +608,7 @@ public class TestIndexWriter extends LuceneTestCase {
doc.add(newField("content4", contents, customType));
type = customType;
} else
- type = TextField.TYPE_NOT_STORED;
+ type = TextField.TYPE_NOT_STORED;
doc.add(newTextField("content1", contents, Field.Store.NO));
doc.add(newField("content3", "", customType));
doc.add(newField("content5", "", type));
@@ -664,13 +664,13 @@ public class TestIndexWriter extends LuceneTestCase {
writer.close();
dir.close();
}
-
+
public void testEmptyFieldNameTerms() throws IOException {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
Document doc = new Document();
doc.add(newTextField("", "a b c", Field.Store.NO));
- writer.addDocument(doc);
+ writer.addDocument(doc);
writer.close();
DirectoryReader reader = DirectoryReader.open(dir);
LeafReader subreader = getOnlyLeafReader(reader);
@@ -682,7 +682,7 @@ public class TestIndexWriter extends LuceneTestCase {
reader.close();
dir.close();
}
-
+
public void testEmptyFieldNameWithEmptyTerm() throws IOException {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
@@ -691,7 +691,7 @@ public class TestIndexWriter extends LuceneTestCase {
doc.add(newStringField("", "a", Field.Store.NO));
doc.add(newStringField("", "b", Field.Store.NO));
doc.add(newStringField("", "c", Field.Store.NO));
- writer.addDocument(doc);
+ writer.addDocument(doc);
writer.close();
DirectoryReader reader = DirectoryReader.open(dir);
LeafReader subreader = getOnlyLeafReader(reader);
@@ -835,7 +835,7 @@ public class TestIndexWriter extends LuceneTestCase {
customType.setStoreTermVectors(true);
customType.setStoreTermVectorPositions(true);
customType.setStoreTermVectorOffsets(true);
-
+
doc.add(newField("content", "aaa bbb ccc ddd eee fff ggg hhh iii", customType));
writer.addDocument(doc);
writer.addDocument(doc);
@@ -923,7 +923,7 @@ public class TestIndexWriter extends LuceneTestCase {
// open/close slowly sometimes
dir.setUseSlowOpenClosers(true);
-
+
// throttle a little
dir.setThrottling(MockDirectoryWrapper.Throttling.SOMETIMES);
@@ -1149,7 +1149,7 @@ public class TestIndexWriter extends LuceneTestCase {
FieldType customType = new FieldType(StoredField.TYPE);
customType.setTokenized(true);
-
+
Field f = new Field("binary", b, 10, 17, customType);
// TODO: this is evil, changing the type after creating the field:
customType.setIndexOptions(IndexOptions.DOCS);
@@ -1158,7 +1158,7 @@ public class TestIndexWriter extends LuceneTestCase {
f.setTokenStream(doc1field1);
FieldType customType2 = new FieldType(TextField.TYPE_STORED);
-
+
Field f2 = newField("string", "value", customType2);
final MockTokenizer doc1field2 = new MockTokenizer(MockTokenizer.WHITESPACE, false);
doc1field2.setReader(new StringReader("doc1field2"));
@@ -1234,7 +1234,7 @@ public class TestIndexWriter extends LuceneTestCase {
public void testDeleteUnusedFiles() throws Exception {
assumeFalse("test relies on exact filenames", Codec.getDefault() instanceof SimpleTextCodec);
assumeWorkingMMapOnWindows();
-
+
for(int iter=0;iter<2;iter++) {
// relies on windows semantics
Path path = createTempDir();
@@ -1251,7 +1251,7 @@ public class TestIndexWriter extends LuceneTestCase {
}
MergePolicy mergePolicy = newLogMergePolicy(true);
-
+
// This test expects all of its segments to be in CFS
mergePolicy.setNoCFSRatio(1.0);
mergePolicy.setMaxCFSSegmentSizeMB(Double.POSITIVE_INFINITY);
@@ -1339,7 +1339,7 @@ public class TestIndexWriter extends LuceneTestCase {
customType.setStoreTermVectors(true);
customType.setStoreTermVectorPositions(true);
customType.setStoreTermVectorOffsets(true);
-
+
doc.add(newField("c", "val", customType));
writer.addDocument(doc);
writer.commit();
@@ -1380,7 +1380,7 @@ public class TestIndexWriter extends LuceneTestCase {
// indexed, flushed (but not committed) and then IW rolls back, then no
// files are left in the Directory.
Directory dir = newDirectory();
-
+
String[] origFiles = dir.listAll();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
.setMaxBufferedDocs(2)
@@ -1410,8 +1410,8 @@ public class TestIndexWriter extends LuceneTestCase {
// Adding just one document does not call flush yet.
int computedExtraFileCount = 0;
for (String file : dir.listAll()) {
- if (IndexWriter.WRITE_LOCK_NAME.equals(file) ||
- file.startsWith(IndexFileNames.SEGMENTS) ||
+ if (IndexWriter.WRITE_LOCK_NAME.equals(file) ||
+ file.startsWith(IndexFileNames.SEGMENTS) ||
IndexFileNames.CODEC_FILE_PATTERN.matcher(file).matches()) {
if (file.lastIndexOf('.') < 0
// don't count stored fields and term vectors in, or any temporary files they might
@@ -1459,7 +1459,7 @@ public class TestIndexWriter extends LuceneTestCase {
FieldType customType3 = new FieldType(TextField.TYPE_STORED);
customType3.setTokenized(false);
customType3.setOmitNorms(true);
-
+
for (int i=0; i<2; i++) {
Document doc = new Document();
doc.add(new Field("id", Integer.toString(i)+BIG, customType3));
@@ -1479,7 +1479,7 @@ public class TestIndexWriter extends LuceneTestCase {
SegmentReader sr = (SegmentReader) ctx.reader();
assertFalse(sr.getFieldInfos().hasVectors());
}
-
+
r0.close();
dir.close();
}
@@ -1502,7 +1502,7 @@ public class TestIndexWriter extends LuceneTestCase {
@Override
public final boolean incrementToken() {
- clearAttributes();
+ clearAttributes();
if (upto < tokens.length) {
termAtt.setEmpty();
termAtt.append(tokens[upto]);
@@ -1725,7 +1725,7 @@ public class TestIndexWriter extends LuceneTestCase {
r.close();
dir.close();
}
-
+
public void testDontInvokeAnalyzerForUnAnalyzedFields() throws Exception {
Analyzer analyzer = new Analyzer() {
@Override
@@ -1760,13 +1760,13 @@ public class TestIndexWriter extends LuceneTestCase {
w.close();
dir.close();
}
-
+
//LUCENE-1468 -- make sure opening an IndexWriter with
// create=true does not remove non-index files
-
+
public void testOtherFiles() throws Throwable {
Directory dir = newDirectory();
- IndexWriter iw = new IndexWriter(dir,
+ IndexWriter iw = new IndexWriter(dir,
newIndexWriterConfig(new MockAnalyzer(random())));
iw.addDocument(new Document());
iw.close();
@@ -1775,15 +1775,15 @@ public class TestIndexWriter extends LuceneTestCase {
IndexOutput out = dir.createOutput("myrandomfile", newIOContext(random()));
out.writeByte((byte) 42);
out.close();
-
+
new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))).close();
-
+
assertTrue(slowFileExists(dir, "myrandomfile"));
} finally {
dir.close();
}
}
-
+
// LUCENE-3849
public void testStopwordsPosIncHole() throws Exception {
Directory dir = newDirectory();
@@ -1812,7 +1812,7 @@ public class TestIndexWriter extends LuceneTestCase {
ir.close();
dir.close();
}
-
+
// LUCENE-3849
public void testStopwordsPosIncHole2() throws Exception {
// use two stopfilters for testing here
@@ -1844,23 +1844,23 @@ public class TestIndexWriter extends LuceneTestCase {
ir.close();
dir.close();
}
-
+
// LUCENE-4575
public void testCommitWithUserDataOnly() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(null));
writer.commit(); // first commit to complete IW create transaction.
-
+
// this should store the commit data, even though no other changes were made
writer.setLiveCommitData(new HashMap<String,String>() {{
put("key", "value");
}}.entrySet());
writer.commit();
-
+
DirectoryReader r = DirectoryReader.open(dir);
assertEquals("value", r.getIndexCommit().getUserData().get("key"));
r.close();
-
+
// now check setCommitData and prepareCommit/commit sequence
writer.setLiveCommitData(new HashMap<String,String>() {{
put("key", "value1");
@@ -1874,7 +1874,7 @@ public class TestIndexWriter extends LuceneTestCase {
r = DirectoryReader.open(dir);
assertEquals("value1", r.getIndexCommit().getUserData().get("key"));
r.close();
-
+
// now should commit the second commitData - there was a bug where
// IndexWriter.finishCommit overrode the second commitData
writer.commit();
@@ -1882,7 +1882,7 @@ public class TestIndexWriter extends LuceneTestCase {
assertEquals("IndexWriter.finishCommit may have overridden the second commitData",
"value2", r.getIndexCommit().getUserData().get("key"));
r.close();
-
+
writer.close();
dir.close();
}
@@ -1897,7 +1897,7 @@ public class TestIndexWriter extends LuceneTestCase {
}
return data;
}
-
+
@Test
public void testGetCommitData() throws Exception {
Directory dir = newDirectory();
@@ -1907,16 +1907,16 @@ public class TestIndexWriter extends LuceneTestCase {
}}.entrySet());
assertEquals("value", getLiveCommitData(writer).get("key"));
writer.close();
-
+
// validate that it's also visible when opening a new IndexWriter
writer = new IndexWriter(dir, newIndexWriterConfig(null)
.setOpenMode(OpenMode.APPEND));
assertEquals("value", getLiveCommitData(writer).get("key"));
writer.close();
-
+
dir.close();
}
-
+
public void testNullAnalyzer() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwConf = newIndexWriterConfig(null);
@@ -1943,7 +1943,7 @@ public class TestIndexWriter extends LuceneTestCase {
iw.close();
dir.close();
}
-
+
public void testNullDocument() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
@@ -1968,7 +1968,7 @@ public class TestIndexWriter extends LuceneTestCase {
iw.close();
dir.close();
}
-
+
public void testNullDocuments() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
@@ -1993,7 +1993,7 @@ public class TestIndexWriter extends LuceneTestCase {
iw.close();
dir.close();
}
-
+
public void testIterableFieldThrowsException() throws IOException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
@@ -2001,7 +2001,7 @@ public class TestIndexWriter extends LuceneTestCase {
int docCount = 0;
int docId = 0;
Set<String> liveIds = new HashSet<>();
- for (int i = 0; i < iters; i++) {
+ for (int i = 0; i < iters; i++) {
int numDocs = atLeast(4);
for (int j = 0; j < numDocs; j++) {
String id = Integer.toString(docId++);
@@ -2009,7 +2009,7 @@ public class TestIndexWriter extends LuceneTestCase {
fields.add(new StringField("id", id, Field.Store.YES));
fields.add(new StringField("foo", TestUtil.randomSimpleString(random()), Field.Store.NO));
docId++;
-
+
boolean success = false;
try {
w.addDocument(new RandomFailingIterable<IndexableField>(fields, random()));
@@ -2041,7 +2041,7 @@ public class TestIndexWriter extends LuceneTestCase {
w.close();
IOUtils.close(reader, dir);
}
-
+
public void testIterableThrowsException() throws IOException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
@@ -2089,7 +2089,7 @@ public class TestIndexWriter extends LuceneTestCase {
w.close();
IOUtils.close(reader, dir);
}
-
+
public void testIterableThrowsException2() throws IOException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
@@ -2129,7 +2129,7 @@ public class TestIndexWriter extends LuceneTestCase {
this.list = list;
this.failOn = random.nextInt(5);
}
-
+
@Override
public Iterator<T> iterator() {
final Iterator<? extends T> docIter = list.iterator();
@@ -2255,7 +2255,7 @@ public class TestIndexWriter extends LuceneTestCase {
writer.close();
dir.close();
}
-
+
public void testMergeAllDeleted() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
@@ -2478,12 +2478,12 @@ public class TestIndexWriter extends LuceneTestCase {
IndexWriter w = new IndexWriter(d, newIndexWriterConfig(new MockAnalyzer(random())));
w.addDocument(new Document());
w.close();
-
+
SegmentInfos sis = SegmentInfos.readLatestCommit(d);
byte[] id1 = sis.getId();
assertNotNull(id1);
assertEquals(StringHelper.ID_LENGTH, id1.length);
-
+
byte[] id2 = sis.info(0).info.getId();
byte[] sciId2 = sis.info(0).getId();
assertNotNull(id2);
@@ -2515,7 +2515,7 @@ public class TestIndexWriter extends LuceneTestCase {
ids.add(id);
}
}
-
+
public void testEmptyNorm() throws Exception {
Directory d = newDirectory();
IndexWriter w = new IndexWriter(d, newIndexWriterConfig(new MockAnalyzer(random())));
@@ -2580,7 +2580,7 @@ public class TestIndexWriter extends LuceneTestCase {
assertEquals(1, r2.getIndexCommit().getGeneration());
assertEquals("segments_1", r2.getIndexCommit().getSegmentsFileName());
r2.close();
-
+
// make a change and another commit
w.addDocument(new Document());
w.commit();
@@ -2867,7 +2867,7 @@ public class TestIndexWriter extends LuceneTestCase {
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
IndexWriter w = new IndexWriter(dir, iwc);
w.close();
-
+
IndexOutput out = dir.createTempOutput("_0", "bkd", IOContext.DEFAULT);
String tempName = out.getName();
out.close();
@@ -3152,7 +3152,7 @@ public class TestIndexWriter extends LuceneTestCase {
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocument(null, new Document(), new NumericDocValuesField("soft_delete", 1));
});
-
+
expectThrows(IllegalArgumentException.class, () -> {
writer.softUpdateDocument(new Term("id", "1"), new Document());
});
@@ -4182,24 +4182,63 @@ public class TestIndexWriter extends LuceneTestCase {
SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
return new MergePolicy.OneMerge(merge.segments) {
@Override
- public void mergeFinished(boolean success) {
+ public void mergeFinished(boolean success, boolean segmentDropped) throws IOException {
+ super.mergeFinished(success, segmentDropped);
onlyFinishOnce.set(true);
}
};
})))) {
- Document doc = new Document();
- doc.add(new StringField("id", "1", Field.Store.NO));
- writer.addDocument(doc);
- writer.flush();
- writer.addDocument(doc);
- writer.flush();
- writer.deleteDocuments(new Term("id", "1"));
- writer.flush();
- assertEquals(2, writer.getSegmentCount());
- assertEquals(0, writer.getDocStats().numDocs);
- assertEquals(2, writer.getDocStats().maxDoc);
- writer.forceMerge(1);
+ Document doc = new Document();
+ doc.add(new StringField("id", "1", Field.Store.NO));
+ writer.addDocument(doc);
+ writer.flush();
+ writer.addDocument(doc);
+ writer.flush();
+ writer.deleteDocuments(new Term("id", "1"));
+ writer.flush();
+ assertEquals(2, writer.getSegmentCount());
+ assertEquals(0, writer.getDocStats().numDocs);
+ assertEquals(2, writer.getDocStats().maxDoc);
+ writer.forceMerge(1);
}
}
}
+
+ public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setMaxCommitMergeWaitMillis(30 * 1000);
+ iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
+ @Override
+ public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
+ return true;
+ }
+
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger,
+ SegmentInfos segmentInfos,
+ MergeContext mergeContext) {
+ List<SegmentCommitInfo> fullyDeletedSegments = segmentInfos.asList().stream()
+ .filter(s -> s.info.maxDoc() - s.getDelCount() == 0)
+ .collect(Collectors.toList());
+ if (fullyDeletedSegments.isEmpty()) {
+ return null;
+ }
+ MergeSpecification spec = new MergeSpecification();
+ spec.add(new OneMerge(fullyDeletedSegments));
+ return spec;
+ }
+ };
+ IndexWriter w = new IndexWriter(dir, iwc);
+ Document d = new Document();
+ d.add(new StringField("id", "1", Field.Store.YES));
+ w.addDocument(d);
+ w.commit();
+ w.updateDocument(new Term("id", "1"), d);
+ w.commit();
+ try (DirectoryReader reader = w.getReader()) {
+ assertEquals(1, reader.numDocs());
+ }
+ IOUtils.close(w, dir);
+ }
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index ce591a2..12eeb9d 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -18,17 +18,47 @@ package org.apache.lucene.index;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase {
-
+
+ private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
+ // Optimize down to a single segment on commit
+ if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
+ List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : segmentInfos) {
+ if (mergeContext.getMergingSegments().contains(sci) == false) {
+ nonMergingSegments.add(sci);
+ }
+ }
+ if (nonMergingSegments.size() > 1) {
+ MergeSpecification mergeSpecification = new MergeSpecification();
+ mergeSpecification.add(new OneMerge(nonMergingSegments));
+ return mergeSpecification;
+ }
+ }
+ return null;
+ }
+ };
+
// Test the normal case
public void testNormalCase() throws IOException {
Directory dir = newDirectory();
@@ -278,6 +308,50 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
assertSetters(new LogDocMergePolicy());
}
+ // Test basic semantics of merge on commit
+ public void testMergeOnCommit() throws IOException {
+ Directory dir = newDirectory();
+
+ IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(NoMergePolicy.INSTANCE));
+ for (int i = 0; i < 5; i++) {
+ TestIndexWriter.addDoc(firstWriter);
+ firstWriter.flush();
+ }
+ DirectoryReader firstReader = DirectoryReader.open(firstWriter);
+ assertEquals(5, firstReader.leaves().size());
+ firstReader.close();
+ firstWriter.close(); // When this writer closes, it does not merge on commit.
+
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(Integer.MAX_VALUE);
+
+
+ IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
+ writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
+
+ DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
+ assertEquals(5, unmergedReader.leaves().size());
+ unmergedReader.close();
+
+ TestIndexWriter.addDoc(writerWithMergePolicy);
+ writerWithMergePolicy.commit(); // Doc added, do merge on commit.
+ assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
+
+ DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
+ assertEquals(1, mergedReader.leaves().size());
+ mergedReader.close();
+
+ try (IndexReader reader = writerWithMergePolicy.getReader()) {
+ IndexSearcher searcher = new IndexSearcher(reader);
+ assertEquals(6, reader.numDocs());
+ assertEquals(6, searcher.count(new MatchAllDocsQuery()));
+ }
+
+ writerWithMergePolicy.close();
+ dir.close();
+ }
+
private void assertSetters(MergePolicy lmp) {
lmp.setMaxCFSSegmentSizeMB(2.0);
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
@@ -294,4 +368,168 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
// TODO: Add more checks for other non-double setters!
}
+
+ public void testCarryOverNewDeletes() throws IOException, InterruptedException {
+ try (Directory directory = newDirectory()) {
+ boolean useSoftDeletes = random().nextBoolean();
+ CountDownLatch waitForMerge = new CountDownLatch(1);
+ CountDownLatch waitForUpdate = new CountDownLatch(1);
+ try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
+ .setSoftDeletesField("soft_delete")
+ .setMergeScheduler(new ConcurrentMergeScheduler())) {
+ @Override
+ protected void merge(MergePolicy.OneMerge merge) throws IOException {
+ waitForMerge.countDown();
+ try {
+ waitForUpdate.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ super.merge(merge);
+ }
+ }) {
+
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ Document d2 = new Document();
+ d2.add(new StringField("id", "2", Field.Store.NO));
+ Document d3 = new Document();
+ d3.add(new StringField("id", "3", Field.Store.NO));
+ writer.addDocument(d1);
+ writer.flush();
+ writer.addDocument(d2);
+ boolean addThreeDocs = random().nextBoolean();
+ int expectedNumDocs = 2;
+ if (addThreeDocs) { // sometimes add another doc to ensure we don't have a fully deleted segment
+ expectedNumDocs = 3;
+ writer.addDocument(d3);
+ }
+ Thread t = new Thread(() -> {
+ try {
+ waitForMerge.await();
+ if (useSoftDeletes) {
+ writer.softUpdateDocument(new Term("id", "2"), d2, new NumericDocValuesField("soft_delete", 1));
+ } else {
+ writer.updateDocument(new Term("id", "2"), d2);
+ }
+ writer.flush();
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ } finally {
+ waitForUpdate.countDown();
+ }
+
+ });
+ t.start();
+ writer.commit();
+ t.join();
+ try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "soft_delete")) {
+ assertEquals(expectedNumDocs, open.numDocs());
+ assertEquals("we should not have any deletes", expectedNumDocs, open.maxDoc());
+ }
+
+ try (DirectoryReader open = DirectoryReader.open(writer)) {
+ assertEquals(expectedNumDocs, open.numDocs());
+ assertEquals("we should not have one delete", expectedNumDocs+1, open.maxDoc());
+ }
+ }
+ }
+ }
+
+ /**
+ * This test makes sure we release the merge readers on abort. MDW will fail if it
+ * can't close all files
+ */
+ public void testAbortCommitMerge() throws IOException, InterruptedException {
+ try (Directory directory = newDirectory()) {
+ CountDownLatch waitForMerge = new CountDownLatch(1);
+ CountDownLatch waitForDeleteAll = new CountDownLatch(1);
+ try (IndexWriter writer = new IndexWriter(directory, newIndexWriterConfig()
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(30 * 1000)
+ .setMergeScheduler(new SerialMergeScheduler() {
+ @Override
+ public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
+ waitForMerge.countDown();
+ try {
+ waitForDeleteAll.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ super.merge(mergeSource, trigger);
+ }
+ }))) {
+
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ Document d2 = new Document();
+ d2.add(new StringField("id", "2", Field.Store.NO));
+ Document d3 = new Document();
+ d3.add(new StringField("id", "3", Field.Store.NO));
+ writer.addDocument(d1);
+ writer.flush();
+ writer.addDocument(d2);
+ Thread t = new Thread(() -> {
+ try {
+ writer.commit();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ t.start();
+ waitForMerge.await();
+ writer.deleteAll();
+ waitForDeleteAll.countDown();
+ t.join();
+ }
+ }
+ }
+
+ public void testStressUpdateSameDocumentWithMergeOnCommit() throws IOException, InterruptedException {
+ try (Directory directory = newDirectory()) {
+ try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig()
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY).setMaxCommitMergeWaitMillis(10 + random().nextInt(2000))
+ .setSoftDeletesField("soft_delete")
+ .setMergeScheduler(new ConcurrentMergeScheduler()))) {
+ Document d1 = new Document();
+ d1.add(new StringField("id", "1", Field.Store.NO));
+ writer.updateDocument(new Term("id", "1"), d1);
+ writer.commit();
+
+ AtomicInteger iters = new AtomicInteger(100 + random().nextInt(TEST_NIGHTLY ? 5000 : 1000));
+ AtomicBoolean done = new AtomicBoolean(false);
+ Thread[] threads = new Thread[1 + random().nextInt(4)];
+ for (int i = 0; i < threads.length; i++) {
+ Thread t = new Thread(() -> {
+ try {
+ while (iters.decrementAndGet() > 0) {
+ writer.updateDocument(new Term("id", "1"), d1);
+ }
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ } finally {
+ done.set(true);
+ }
+
+ });
+ t.start();
+ threads[i] = t;
+ }
+ try {
+ while (done.get() == false) {
+ if (random().nextBoolean()) {
+ writer.commit();
+ }
+ try (DirectoryReader open = new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(directory), "___soft_deletes")) {
+ assertEquals(1, open.numDocs());
+ }
+ }
+ } finally {
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
+ }
+ }
+ }
}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
index c252da0..93a9b17 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
@@ -43,7 +43,7 @@ public class TestMergePolicy extends LuceneTestCase {
Thread t = new Thread(() -> {
try {
for (MergePolicy.OneMerge m : ms.merges) {
- m.mergeFinished(true);
+ m.close(true, false, mr -> {});
}
} catch (IOException e) {
throw new AssertionError(e);
@@ -66,7 +66,7 @@ public class TestMergePolicy extends LuceneTestCase {
}
Thread t = new Thread(() -> {
try {
- ms.merges.get(0).mergeFinished(true);
+ ms.merges.get(0).close(true, false, mr -> {});
} catch (IOException e) {
throw new AssertionError(e);
}
@@ -89,7 +89,7 @@ public class TestMergePolicy extends LuceneTestCase {
Thread t = new Thread(() -> {
while (stop.get() == false) {
try {
- ms.merges.get(i.getAndIncrement()).mergeFinished(true);
+ ms.merges.get(i.getAndIncrement()).close(true, false, mr -> {});
Thread.sleep(1);
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
@@ -114,8 +114,8 @@ public class TestMergePolicy extends LuceneTestCase {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
- oneMerge.mergeFinished(true);
- expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
+ oneMerge.close(true, false, mr -> {});
+ expectThrows(IllegalStateException.class, () -> oneMerge.close(false, false, mr -> {}));
}
}
diff --git a/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java b/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java
index c8d9d51..f3cc088 100644
--- a/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java
+++ b/lucene/sandbox/src/test/org/apache/lucene/search/TestPhraseWildcardQuery.java
@@ -27,6 +27,7 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
@@ -66,12 +67,16 @@ public class TestPhraseWildcardQuery extends LuceneTestCase {
public void setUp() throws Exception {
super.setUp();
directory = newDirectory();
- RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
+ RandomIndexWriter iw = new RandomIndexWriter(random(), directory,
+ newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)); // do not accidentally merge
+ // the two segments we create
+ // here
iw.setDoRandomForceMerge(false); // Keep the segments separated.
addSegments(iw);
reader = iw.getReader();
iw.close();
searcher = newSearcher(reader);
+ assertEquals("test test relies on 2 segments", 2, searcher.getIndexReader().leaves().size());
}
@Override
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
index beb4dad..92ffc73 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
@@ -129,6 +129,38 @@ public class MockRandomMergePolicy extends MergePolicy {
}
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
+ if (mergeSpecification == null) {
+ return null;
+ }
+ // Do not return any merges involving already-merging segments.
+ MergeSpecification filteredMergeSpecification = new MergeSpecification();
+ for (OneMerge oneMerge : mergeSpecification.merges) {
+ boolean filtered = false;
+ List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : oneMerge.segments) {
+ if (mergeContext.getMergingSegments().contains(sci) == false) {
+ nonMergingSegments.add(sci);
+ } else {
+ filtered = true;
+ }
+ }
+ if (filtered == true) {
+ if (nonMergingSegments.size() > 0) {
+ filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
+ }
+ } else {
+ filteredMergeSpecification.add(oneMerge);
+ }
+ }
+ if (filteredMergeSpecification.merges.size() > 0) {
+ return filteredMergeSpecification;
+ }
+ return null;
+ }
+
+ @Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
// 80% of the time we create CFS:
return random.nextInt(5) != 1;
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 4a94fe8..5ed031a 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1006,6 +1006,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
+ c.setMaxCommitMergeWaitMillis(rarely() ? atLeast(r, 1000) : atLeast(r, 200));
return c;
}