You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by so...@apache.org on 2020/06/03 19:14:15 UTC
[lucene-solr] 46/47: Revert "Revert "LUCENE-8962""
This is an automated email from the ASF dual-hosted git repository.
sokolov pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 4e7c17e328edb7856f8007698c22ad233be9d6f2
Author: Michael Sokolov <so...@amazon.com>
AuthorDate: Mon Jun 1 14:45:00 2020 -0400
Revert "Revert "LUCENE-8962""
This reverts commit 4501b3d3fdbc35af99bde6abe7432cfc5e8b5547.
This reverts commit 075adac59865b3277adcf86052f2fae3e6d11135.
---
lucene/CHANGES.txt | 2 +
.../org/apache/lucene/index/FilterMergePolicy.java | 5 +
.../java/org/apache/lucene/index/IndexWriter.java | 114 ++++++++++++++++++++-
.../org/apache/lucene/index/IndexWriterConfig.java | 29 ++++++
.../org/apache/lucene/index/IndexWriterEvents.java | 57 +++++++++++
.../apache/lucene/index/LiveIndexWriterConfig.java | 26 +++++
.../java/org/apache/lucene/index/MergePolicy.java | 28 ++++-
.../java/org/apache/lucene/index/MergeTrigger.java | 7 +-
.../org/apache/lucene/index/NoMergePolicy.java | 3 +
.../lucene/index/OneMergeWrappingMergePolicy.java | 5 +
.../lucene/index/TestIndexWriterMergePolicy.java | 70 ++++++++++++-
.../apache/lucene/index/MockRandomMergePolicy.java | 32 ++++++
.../org/apache/lucene/util/LuceneTestCase.java | 1 +
13 files changed, 373 insertions(+), 6 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 50b7f7b..6e63ad7 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -376,6 +376,8 @@ Improvements
* LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim)
+* LUCENE-8962: Add ability to selectively merge on commit (Michael Froh)
+
* LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively
boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward)
diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
index eb634b4..b4e33f8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
@@ -58,6 +58,11 @@ public class FilterMergePolicy extends MergePolicy {
}
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
+ }
+
+ @Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {
return in.useCompoundFile(infos, mergedInfo, mergeContext);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 52adbef..88fdb90 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -33,6 +33,8 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -3152,6 +3154,42 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
+ private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
+ AtomicReference<CountDownLatch> mergeLatchRef) {
+ return new MergePolicy.OneMerge(merge.segments) {
+ public void mergeFinished() throws IOException {
+ super.mergeFinished();
+ CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
+ if (mergeAwaitLatch == null) {
+ // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
+ return;
+ }
+ if (committed) {
+ deleter.incRef(this.info.files());
+ // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
+ Set<String> mergedSegmentNames = new HashSet<>();
+ for (SegmentCommitInfo sci : this.segments) {
+ deleter.decRef(sci.files());
+ mergedSegmentNames.add(sci.info.name);
+ }
+ List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : toCommit) {
+ if (mergedSegmentNames.contains(sci.info.name)) {
+ toCommitMergedAwaySegments.add(sci);
+ }
+ }
+ // Construct a OneMerge that applies to toCommit
+ MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
+ applicableMerge.info = this.info.clone();
+ long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
+ toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
+ toCommit.applyMergeChanges(applicableMerge, false);
+ }
+ mergeAwaitLatch.countDown();
+ }
+ };
+ }
+
private long prepareCommitInternal() throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {
@@ -3174,6 +3212,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;
+ List<MergePolicy.OneMerge> commitMerges = null;
+ AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
@@ -3228,6 +3268,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// sneak into the commit point:
toCommit = segmentInfos.clone();
+ if (anyChanges) {
+ // Find any merges that can execute on commit (per MergePolicy).
+ MergePolicy.MergeSpecification mergeSpec =
+ config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
+ if (mergeSpec != null && mergeSpec.merges.size() > 0) {
+ int mergeCount = mergeSpec.merges.size();
+ commitMerges = new ArrayList<>(mergeCount);
+ mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
+ for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
+ MergePolicy.OneMerge trackedMerge =
+ updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
+ if (registerMerge(trackedMerge) == false) {
+ throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
+ " returned merging segments from findFullFlushMerges");
+ }
+ commitMerges.add(trackedMerge);
+ }
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "Registered " + mergeCount + " commit merges");
+ infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
+ }
+ }
+ }
+
pendingCommitChangeCount = changeCount.get();
// This protects the segmentInfos we are now going
@@ -3235,8 +3299,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
- filesToCommit = toCommit.files(false);
- deleter.incRef(filesToCommit);
+ deleter.incRef(toCommit.files(false));
}
success = true;
} finally {
@@ -3257,6 +3320,52 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} finally {
maybeCloseOnTragicEvent();
}
+
+ if (mergeAwaitLatchRef != null) {
+ CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
+ // If we found and registered any merges above, within the flushLock, then we want to ensure that they
+ // complete execution. Note that since we released the lock, other merges may have been scheduled. We will
+ // block until the merges that we registered complete. As they complete, they will update toCommit to
+ // replace merged segments with the result of each merge.
+ config.getIndexWriterEvents().beginMergeOnCommit();
+ mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
+ long mergeWaitStart = System.nanoTime();
+ int abandonedCount = 0;
+ long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
+ try {
+ if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
+ synchronized (this) {
+ // Need to do this in a synchronized block, to make sure none of our commit merges are currently
+ // executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
+ // After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
+ // usual, but when they finish, they won't attempt to update toCommit or modify segment reference
+ // counts.
+ mergeAwaitLatchRef.set(null);
+ for (MergePolicy.OneMerge commitMerge : commitMerges) {
+ if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
+ abandonedCount++;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ } finally {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
+ (System.nanoTime() - mergeWaitStart)/1_000_000.0));
+ infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
+ if (abandonedCount > 0) {
+ infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
+ }
+ }
+ if (abandonedCount > 0) {
+ config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
+ }
+ config.getIndexWriterEvents().finishMergeOnCommit();
+ }
+ }
+ filesToCommit = toCommit.files(false);
try {
if (anyChanges) {
@@ -3962,6 +4071,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
try (Closeable finalizer = this::checkpoint) {
+ merge.committed = true;
// Must close before checkpoint, otherwise IFD won't be
// able to delete the held-open files from the merge
// readers:
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 26e7e3d..629b1e8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.PrintStream;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
@@ -109,6 +110,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
+
+ /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
+ public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
@@ -460,6 +464,31 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
}
/**
+ * Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
+ * If this time is reached, we proceed with the commit based on segments merged up to that point.
+ * The merges are not cancelled, and may still run to completion independent of the commit.
+ */
+ public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
+ this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
+ return this;
+ }
+
+ /**
+ * Set the callback that gets invoked when IndexWriter performs various actions.
+ */
+ public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
+ this.indexWriterEvents = indexWriterEvents;
+ return this;
+ }
+
+ /** We only allow sorting on these types */
+ private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
+ SortField.Type.LONG,
+ SortField.Type.INT,
+ SortField.Type.DOUBLE,
+ SortField.Type.FLOAT);
+
+ /**
* Set the {@link Sort} order to use for all (flushed and merged) segments.
*/
public IndexWriterConfig setIndexSort(Sort sort) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
new file mode 100644
index 0000000..d36fb25
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+/**
+ * Callback interface to signal various actions taken by IndexWriter.
+ *
+ * @lucene.experimental
+ */
+public interface IndexWriterEvents {
+ /**
+ * A default implementation that ignores all events.
+ */
+ IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
+ @Override
+ public void beginMergeOnCommit() { }
+
+ @Override
+ public void finishMergeOnCommit() { }
+
+ @Override
+ public void abandonedMergesOnCommit(int abandonedCount) { }
+ };
+
+ /**
+ * Signals the start of waiting for a merge on commit, returned from
+ * {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
+ */
+ void beginMergeOnCommit();
+
+ /**
+ * Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
+ * to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+ */
+ void finishMergeOnCommit();
+
+ /**
+ * Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
+ * {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+ */
+ void abandonedMergesOnCommit(int abandonedCount);
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 1f48acc..59a54c7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -109,6 +109,12 @@ public class LiveIndexWriterConfig {
/** soft deletes field */
protected String softDeletesField = null;
+ /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
+ protected volatile double maxCommitMergeWaitSeconds;
+
+ /** Callback interface called on index writer actions. */
+ protected IndexWriterEvents indexWriterEvents;
+
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
@@ -132,6 +138,8 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
+ maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
+ indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
}
/** Returns the default analyzer to use for indexing documents. */
@@ -461,6 +469,22 @@ public class LiveIndexWriterConfig {
return softDeletesField;
}
+ /**
+ * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
+ * If this time is reached, we proceed with the commit based on segments merged up to that point.
+ * The merges are not cancelled, and may still run to completion independent of the commit.
+ */
+ public double getMaxCommitMergeWaitSeconds() {
+ return maxCommitMergeWaitSeconds;
+ }
+
+ /**
+ * Returns a callback used to signal actions taken by the {@link IndexWriter}.
+ */
+ public IndexWriterEvents getIndexWriterEvents() {
+ return indexWriterEvents;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -484,6 +508,8 @@ public class LiveIndexWriterConfig {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
+ sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
+ sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
return sb.toString();
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 3ac3914..13fb2db 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -225,6 +225,8 @@ public abstract class MergePolicy {
public final int totalMaxDoc;
Throwable error;
+ boolean committed; // Set by IndexWriter once the merge has been committed to disk
+
/** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s
* to be merged. */
@@ -500,7 +502,7 @@ public abstract class MergePolicy {
* an original segment present in the
* to-be-merged index; else, it was a segment
* produced by a cascaded merge.
- * @param mergeContext the IndexWriter to find the merges on
+ * @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedMerges(
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@@ -511,12 +513,34 @@ public abstract class MergePolicy {
* deletes from the index.
* @param segmentInfos
* the total set of segments in the index
- * @param mergeContext the IndexWriter to find the merges on
+ * @param mergeContext the MergeContext to find the merges on
*/
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
/**
+ * Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
+ *
+ * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
+ * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
+ * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
+ * the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
+ * in the commit.
+ *
+ * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
+ * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
+ * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
+ *
+ * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
+ * @param segmentInfos the total set of segments in the index (while preparing the commit)
+ * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
+ * already in a registered merge (see {@link MergeContext#getMergingSegments()}).
+ */
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return null;
+ }
+
+ /**
* Returns true if a new segment (regardless of its origin) should use the
* compound file format. The default implementation returns <code>true</code>
* iff the size of the given mergedInfo is less or equal to
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
index d165a27..01a6b15 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
@@ -47,5 +47,10 @@ public enum MergeTrigger {
/**
* Merge was triggered by a closing IndexWriter.
*/
- CLOSING
+ CLOSING,
+
+ /**
+ * Merge was triggered on commit.
+ */
+ COMMIT,
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
index 1480ce4..b209e8ae 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
@@ -46,6 +46,9 @@ public final class NoMergePolicy extends MergePolicy {
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
+
+ @Override
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
return newSegment.info.getUseCompoundFile();
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
index d08711e..a5fd66a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
@@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
+ }
+
private MergeSpecification wrapSpec(MergeSpecification spec) {
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
if (wrapped != null) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index ce591a2..8a463ef 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -18,17 +18,42 @@ package org.apache.lucene.index;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterMergePolicy extends LuceneTestCase {
-
+
+ private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
+ @Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
+ // Optimize down to a single segment on commit
+ if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
+ List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : segmentInfos) {
+ if (mergeContext.getMergingSegments().contains(sci) == false) {
+ nonMergingSegments.add(sci);
+ }
+ }
+ if (nonMergingSegments.size() > 1) {
+ MergeSpecification mergeSpecification = new MergeSpecification();
+ mergeSpecification.add(new OneMerge(nonMergingSegments));
+ return mergeSpecification;
+ }
+ }
+ return null;
+ }
+ };
+
// Test the normal case
public void testNormalCase() throws IOException {
Directory dir = newDirectory();
@@ -278,6 +303,49 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
assertSetters(new LogDocMergePolicy());
}
+ // Test basic semantics of merge on commit
+ public void testMergeOnCommit() throws IOException {
+ Directory dir = newDirectory();
+
+ IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(NoMergePolicy.INSTANCE));
+ for (int i = 0; i < 5; i++) {
+ TestIndexWriter.addDoc(firstWriter);
+ firstWriter.flush();
+ }
+ DirectoryReader firstReader = DirectoryReader.open(firstWriter);
+ assertEquals(5, firstReader.leaves().size());
+ firstReader.close();
+ firstWriter.close(); // When this writer closes, it does not merge on commit.
+
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
+ .setMergePolicy(MERGE_ON_COMMIT_POLICY);
+
+ IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
+ writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
+
+ DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
+ assertEquals(5, unmergedReader.leaves().size());
+ unmergedReader.close();
+
+ TestIndexWriter.addDoc(writerWithMergePolicy);
+ writerWithMergePolicy.commit(); // Doc added, do merge on commit.
+ assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
+
+ DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
+ assertEquals(1, mergedReader.leaves().size());
+ mergedReader.close();
+
+ try (IndexReader reader = writerWithMergePolicy.getReader()) {
+ IndexSearcher searcher = new IndexSearcher(reader);
+ assertEquals(6, reader.numDocs());
+ assertEquals(6, searcher.count(new MatchAllDocsQuery()));
+ }
+
+ writerWithMergePolicy.close();
+ dir.close();
+ }
+
private void assertSetters(MergePolicy lmp) {
lmp.setMaxCFSSegmentSizeMB(2.0);
assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
index beb4dad..92ffc73 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
@@ -129,6 +129,38 @@ public class MockRandomMergePolicy extends MergePolicy {
}
@Override
+ public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+ MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
+ if (mergeSpecification == null) {
+ return null;
+ }
+ // Do not return any merges involving already-merging segments.
+ MergeSpecification filteredMergeSpecification = new MergeSpecification();
+ for (OneMerge oneMerge : mergeSpecification.merges) {
+ boolean filtered = false;
+ List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+ for (SegmentCommitInfo sci : oneMerge.segments) {
+ if (mergeContext.getMergingSegments().contains(sci) == false) {
+ nonMergingSegments.add(sci);
+ } else {
+ filtered = true;
+ }
+ }
+ if (filtered == true) {
+ if (nonMergingSegments.size() > 0) {
+ filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
+ }
+ } else {
+ filteredMergeSpecification.add(oneMerge);
+ }
+ }
+ if (filteredMergeSpecification.merges.size() > 0) {
+ return filteredMergeSpecification;
+ }
+ return null;
+ }
+
+ @Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
// 80% of the time we create CFS:
return random.nextInt(5) != 1;
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 9f2cd27..cc779a0 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1003,6 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
+ c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
return c;
}