You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by so...@apache.org on 2020/06/03 19:14:15 UTC

[lucene-solr] 46/47: Revert "Revert "LUCENE-8962""

This is an automated email from the ASF dual-hosted git repository.

sokolov pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 4e7c17e328edb7856f8007698c22ad233be9d6f2
Author: Michael Sokolov <so...@amazon.com>
AuthorDate: Mon Jun 1 14:45:00 2020 -0400

    Revert "Revert "LUCENE-8962""
    
    This reverts commit 4501b3d3fdbc35af99bde6abe7432cfc5e8b5547.
    
    This reverts commit 075adac59865b3277adcf86052f2fae3e6d11135.
---
 lucene/CHANGES.txt                                 |   2 +
 .../org/apache/lucene/index/FilterMergePolicy.java |   5 +
 .../java/org/apache/lucene/index/IndexWriter.java  | 114 ++++++++++++++++++++-
 .../org/apache/lucene/index/IndexWriterConfig.java |  29 ++++++
 .../org/apache/lucene/index/IndexWriterEvents.java |  57 +++++++++++
 .../apache/lucene/index/LiveIndexWriterConfig.java |  26 +++++
 .../java/org/apache/lucene/index/MergePolicy.java  |  28 ++++-
 .../java/org/apache/lucene/index/MergeTrigger.java |   7 +-
 .../org/apache/lucene/index/NoMergePolicy.java     |   3 +
 .../lucene/index/OneMergeWrappingMergePolicy.java  |   5 +
 .../lucene/index/TestIndexWriterMergePolicy.java   |  70 ++++++++++++-
 .../apache/lucene/index/MockRandomMergePolicy.java |  32 ++++++
 .../org/apache/lucene/util/LuceneTestCase.java     |   1 +
 13 files changed, 373 insertions(+), 6 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 50b7f7b..6e63ad7 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -376,6 +376,8 @@ Improvements
 
 * LUCENE-9253: KoreanTokenizer now supports custom dictionaries(system, unknown). (Namgyu Kim)
 
+* LUCENE-8962: Add ability to selectively merge on commit (Michael Froh)
+
 * LUCENE-9171: QueryBuilder can now use BoostAttributes on input token streams to selectively
   boost particular terms or synonyms in parsed queries. (Alessandro Benedetti, Alan Woodward)
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
index eb634b4..b4e33f8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
@@ -58,6 +58,11 @@ public class FilterMergePolicy extends MergePolicy {
   }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
+  }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
       throws IOException {
     return in.useCompoundFile(infos, mergedInfo, mergeContext);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 52adbef..88fdb90 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -33,6 +33,8 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -3152,6 +3154,42 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
   }
 
+  private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
+                                                                AtomicReference<CountDownLatch> mergeLatchRef) {
+    return new MergePolicy.OneMerge(merge.segments) {
+      public void mergeFinished() throws IOException {
+        super.mergeFinished();
+        CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
+        if (mergeAwaitLatch == null) {
+          // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
+          return;
+        }
+        if (committed) {
+          deleter.incRef(this.info.files());
+          // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
+          Set<String> mergedSegmentNames = new HashSet<>();
+          for (SegmentCommitInfo sci : this.segments) {
+            deleter.decRef(sci.files());
+            mergedSegmentNames.add(sci.info.name);
+          }
+          List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
+          for (SegmentCommitInfo sci : toCommit) {
+            if (mergedSegmentNames.contains(sci.info.name)) {
+              toCommitMergedAwaySegments.add(sci);
+            }
+          }
+          // Construct a OneMerge that applies to toCommit
+          MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
+          applicableMerge.info = this.info.clone();
+          long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
+          toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
+          toCommit.applyMergeChanges(applicableMerge, false);
+        }
+        mergeAwaitLatch.countDown();
+      }
+    };
+  }
+
   private long prepareCommitInternal() throws IOException {
     startCommitTime = System.nanoTime();
     synchronized(commitLock) {
@@ -3174,6 +3212,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       SegmentInfos toCommit = null;
       boolean anyChanges = false;
       long seqNo;
+      List<MergePolicy.OneMerge> commitMerges = null;
+      AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
 
       // This is copied from doFlush, except it's modified to
       // clone & incRef the flushed SegmentInfos inside the
@@ -3228,6 +3268,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // sneak into the commit point:
               toCommit = segmentInfos.clone();
 
+              if (anyChanges) {
+                // Find any merges that can execute on commit (per MergePolicy).
+                MergePolicy.MergeSpecification mergeSpec =
+                    config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
+                if (mergeSpec != null && mergeSpec.merges.size() > 0) {
+                  int mergeCount = mergeSpec.merges.size();
+                  commitMerges = new ArrayList<>(mergeCount);
+                  mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
+                  for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
+                    MergePolicy.OneMerge trackedMerge =
+                        updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
+                    if (registerMerge(trackedMerge) == false) {
+                      throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
+                          " returned merging segments from findFullFlushMerges");
+                    }
+                    commitMerges.add(trackedMerge);
+                  }
+                  if (infoStream.isEnabled("IW")) {
+                    infoStream.message("IW", "Registered " + mergeCount + " commit merges");
+                    infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
+                  }
+                }
+              }
+
               pendingCommitChangeCount = changeCount.get();
 
               // This protects the segmentInfos we are now going
@@ -3235,8 +3299,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // we are trying to sync all referenced files, a
               // merge completes which would otherwise have
               // removed the files we are now syncing.    
-              filesToCommit = toCommit.files(false); 
-              deleter.incRef(filesToCommit);
+              deleter.incRef(toCommit.files(false));
             }
             success = true;
           } finally {
@@ -3257,6 +3320,52 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       } finally {
         maybeCloseOnTragicEvent();
       }
+
+      if (mergeAwaitLatchRef != null) {
+        CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
+        // If we found and registered any merges above, within the flushLock, then we want to ensure that they
+        // complete execution. Note that since we released the lock, other merges may have been scheduled. We will
+        // block until  the merges that we registered complete. As they complete, they will update toCommit to
+        // replace merged segments with the result of each merge.
+        config.getIndexWriterEvents().beginMergeOnCommit();
+        mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT);
+        long mergeWaitStart = System.nanoTime();
+        int abandonedCount = 0;
+        long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
+        try {
+          if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
+            synchronized (this) {
+              // Need to do this in a synchronized block, to make sure none of our commit merges are currently
+              // executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
+              // After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
+              // usual, but when they finish, they won't attempt to update toCommit or modify segment reference
+              // counts.
+              mergeAwaitLatchRef.set(null);
+              for (MergePolicy.OneMerge commitMerge : commitMerges) {
+                if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
+                  abandonedCount++;
+                }
+              }
+            }
+          }
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        } finally {
+          if (infoStream.isEnabled("IW")) {
+            infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
+                (System.nanoTime() - mergeWaitStart)/1_000_000.0));
+            infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
+            if (abandonedCount > 0) {
+              infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
+            }
+          }
+          if (abandonedCount > 0) {
+            config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
+          }
+          config.getIndexWriterEvents().finishMergeOnCommit();
+        }
+      }
+      filesToCommit = toCommit.files(false);
      
       try {
         if (anyChanges) {
@@ -3962,6 +4071,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
 
     try (Closeable finalizer = this::checkpoint) {
+      merge.committed = true;
       // Must close before checkpoint, otherwise IFD won't be
       // able to delete the held-open files from the merge
       // readers:
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 26e7e3d..629b1e8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
 
 import java.io.PrintStream;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.stream.Collectors;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -109,6 +110,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   
   /** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
   public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
+
+  /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
+  public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
   
   // indicates whether this config instance is already attached to a writer.
   // not final so that it can be cloned properly.
@@ -460,6 +464,31 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   }
 
   /**
+   * Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
+   * If this time is reached, we proceed with the commit based on segments merged up to that point.
+   * The merges are not cancelled, and may still run to completion independent of the commit.
+   */
+  public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
+    this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
+    return this;
+  }
+
+  /**
+   * Set the callback that gets invoked when IndexWriter performs various actions.
+   */
+  public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
+    this.indexWriterEvents = indexWriterEvents;
+    return this;
+  }
+
+  /** We only allow sorting on these types */
+  private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
+                                                                                     SortField.Type.LONG,
+                                                                                     SortField.Type.INT,
+                                                                                     SortField.Type.DOUBLE,
+                                                                                     SortField.Type.FLOAT);
+
+  /**
    * Set the {@link Sort} order to use for all (flushed and merged) segments.
    */
   public IndexWriterConfig setIndexSort(Sort sort) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
new file mode 100644
index 0000000..d36fb25
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+/**
+ * Callback interface to signal various actions taken by IndexWriter.
+ *
+ * @lucene.experimental
+ */
+public interface IndexWriterEvents {
+  /**
+   * A default implementation that ignores all events.
+   */
+  IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
+    @Override
+    public void beginMergeOnCommit() { }
+
+    @Override
+    public void finishMergeOnCommit() { }
+
+    @Override
+    public void abandonedMergesOnCommit(int abandonedCount) { }
+  };
+
+  /**
+   * Signals the start of waiting for a merge on commit, returned from
+   * {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
+   */
+  void beginMergeOnCommit();
+
+  /**
+   * Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
+   * to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+   */
+  void finishMergeOnCommit();
+
+  /**
+   * Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
+   * {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+   */
+  void abandonedMergesOnCommit(int abandonedCount);
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index 1f48acc..59a54c7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -109,6 +109,12 @@ public class LiveIndexWriterConfig {
   /** soft deletes field */
   protected String softDeletesField = null;
 
+  /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
+  protected volatile double maxCommitMergeWaitSeconds;
+
+  /** Callback interface called on index writer actions. */
+  protected IndexWriterEvents indexWriterEvents;
+
 
   // used by IndexWriterConfig
   LiveIndexWriterConfig(Analyzer analyzer) {
@@ -132,6 +138,8 @@ public class LiveIndexWriterConfig {
     flushPolicy = new FlushByRamOrCountsPolicy();
     readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
     perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
+    maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
+    indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
   }
   
   /** Returns the default analyzer to use for indexing documents. */
@@ -461,6 +469,22 @@ public class LiveIndexWriterConfig {
     return softDeletesField;
   }
 
+  /**
+   * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
+   * If this time is reached, we proceed with the commit based on segments merged up to that point.
+   * The merges are not cancelled, and may still run to completion independent of the commit.
+   */
+  public double getMaxCommitMergeWaitSeconds() {
+    return maxCommitMergeWaitSeconds;
+  }
+
+  /**
+   * Returns a callback used to signal actions taken by the {@link IndexWriter}.
+   */
+  public IndexWriterEvents getIndexWriterEvents() {
+    return indexWriterEvents;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -484,6 +508,8 @@ public class LiveIndexWriterConfig {
     sb.append("indexSort=").append(getIndexSort()).append("\n");
     sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
     sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
+    sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
+    sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
     return sb.toString();
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 3ac3914..13fb2db 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -225,6 +225,8 @@ public abstract class MergePolicy {
     public final int totalMaxDoc;
     Throwable error;
 
+    boolean committed; // Set by IndexWriter once the merge has been committed to disk
+
     /** Sole constructor.
      * @param segments List of {@link SegmentCommitInfo}s
      *        to be merged. */
@@ -500,7 +502,7 @@ public abstract class MergePolicy {
  *          an original segment present in the
  *          to-be-merged index; else, it was a segment
  *          produced by a cascaded merge.
-   * @param mergeContext the IndexWriter to find the merges on
+   * @param mergeContext the MergeContext to find the merges on
    */
   public abstract MergeSpecification findForcedMerges(
       SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@@ -511,12 +513,34 @@ public abstract class MergePolicy {
    * deletes from the index.
    *  @param segmentInfos
    *          the total set of segments in the index
-   * @param mergeContext the IndexWriter to find the merges on
+   * @param mergeContext the MergeContext to find the merges on
    */
   public abstract MergeSpecification findForcedDeletesMerges(
       SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
 
   /**
+   * Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
+   *
+   * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
+   * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
+   * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
+   * the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
+   * in the commit.
+   *
+   * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
+   * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
+   * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
+   *
+   * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
+   * @param segmentInfos the total set of segments in the index (while preparing the commit)
+   * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
+ *                     already in a registered merge (see {@link MergeContext#getMergingSegments()}).
+   */
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return null;
+  }
+
+  /**
    * Returns true if a new segment (regardless of its origin) should use the
    * compound file format. The default implementation returns <code>true</code>
    * iff the size of the given mergedInfo is less or equal to
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
index d165a27..01a6b15 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
@@ -47,5 +47,10 @@ public enum MergeTrigger {
   /**
    * Merge was triggered by a closing IndexWriter.
    */
-  CLOSING
+  CLOSING,
+
+  /**
+   * Merge was triggered on commit.
+   */
+  COMMIT,
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
index 1480ce4..b209e8ae 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
@@ -46,6 +46,9 @@ public final class NoMergePolicy extends MergePolicy {
   public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
     return newSegment.info.getUseCompoundFile();
   }
diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
index d08711e..a5fd66a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
@@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
     return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
   }
 
+  @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
+  }
+
   private MergeSpecification wrapSpec(MergeSpecification spec) {
     MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
     if (wrapped != null) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index ce591a2..8a463ef 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -18,17 +18,42 @@ package org.apache.lucene.index;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 
 import org.apache.lucene.util.LuceneTestCase;
 
 public class TestIndexWriterMergePolicy extends LuceneTestCase {
-  
+
+  private static final MergePolicy MERGE_ON_COMMIT_POLICY = new LogDocMergePolicy() {
+    @Override
+    public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
+      // Optimize down to a single segment on commit
+      if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
+        List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+        for (SegmentCommitInfo sci : segmentInfos) {
+          if (mergeContext.getMergingSegments().contains(sci) == false) {
+            nonMergingSegments.add(sci);
+          }
+        }
+        if (nonMergingSegments.size() > 1) {
+          MergeSpecification mergeSpecification = new MergeSpecification();
+          mergeSpecification.add(new OneMerge(nonMergingSegments));
+          return mergeSpecification;
+        }
+      }
+      return null;
+    }
+  };
+
   // Test the normal case
   public void testNormalCase() throws IOException {
     Directory dir = newDirectory();
@@ -278,6 +303,49 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
     assertSetters(new LogDocMergePolicy());
   }
 
+  // Test basic semantics of merge on commit
+  public void testMergeOnCommit() throws IOException {
+    Directory dir = newDirectory();
+
+    IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
+        .setMergePolicy(NoMergePolicy.INSTANCE));
+    for (int i = 0; i < 5; i++) {
+      TestIndexWriter.addDoc(firstWriter);
+      firstWriter.flush();
+    }
+    DirectoryReader firstReader = DirectoryReader.open(firstWriter);
+    assertEquals(5, firstReader.leaves().size());
+    firstReader.close();
+    firstWriter.close(); // When this writer closes, it does not merge on commit.
+
+    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
+        .setMergePolicy(MERGE_ON_COMMIT_POLICY);
+
+    IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
+    writerWithMergePolicy.commit(); // No changes. Commit doesn't trigger a merge.
+
+    DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
+    assertEquals(5, unmergedReader.leaves().size());
+    unmergedReader.close();
+
+    TestIndexWriter.addDoc(writerWithMergePolicy);
+    writerWithMergePolicy.commit(); // Doc added, do merge on commit.
+    assertEquals(1, writerWithMergePolicy.getSegmentCount()); //
+
+    DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
+    assertEquals(1, mergedReader.leaves().size());
+    mergedReader.close();
+
+    try (IndexReader reader = writerWithMergePolicy.getReader()) {
+      IndexSearcher searcher = new IndexSearcher(reader);
+      assertEquals(6, reader.numDocs());
+      assertEquals(6, searcher.count(new MatchAllDocsQuery()));
+    }
+
+    writerWithMergePolicy.close();
+    dir.close();
+  }
+
   private void assertSetters(MergePolicy lmp) {
     lmp.setMaxCFSSegmentSizeMB(2.0);
     assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
index beb4dad..92ffc73 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
@@ -129,6 +129,38 @@ public class MockRandomMergePolicy extends MergePolicy {
   }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
+    if (mergeSpecification == null) {
+      return null;
+    }
+    // Do not return any merges involving already-merging segments.
+    MergeSpecification filteredMergeSpecification = new MergeSpecification();
+    for (OneMerge oneMerge : mergeSpecification.merges) {
+      boolean filtered = false;
+      List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+      for (SegmentCommitInfo sci : oneMerge.segments) {
+        if (mergeContext.getMergingSegments().contains(sci) == false) {
+          nonMergingSegments.add(sci);
+        } else {
+          filtered = true;
+        }
+      }
+      if (filtered == true) {
+        if (nonMergingSegments.size() > 0) {
+          filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
+        }
+      } else {
+        filteredMergeSpecification.add(oneMerge);
+      }
+    }
+    if (filteredMergeSpecification.merges.size() > 0) {
+      return filteredMergeSpecification;
+    }
+    return null;
+  }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
     // 80% of the time we create CFS:
     return random.nextInt(5) != 1;
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 9f2cd27..cc779a0 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1003,6 +1003,7 @@ public abstract class LuceneTestCase extends Assert {
     if (rarely(r)) {
       c.setCheckPendingFlushUpdate(false);
     }
+    c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
     return c;
   }