You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by so...@apache.org on 2020/03/02 23:33:47 UTC

[lucene-solr] 01/03: LUCENE-8962: Add ability to selectively merge on commit (#1155)

This is an automated email from the ASF dual-hosted git repository.

sokolov pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit a1791e77143aa8087c0b5ee0e8eb57422e59a09a
Author: msfroh <ms...@gmail.com>
AuthorDate: Mon Mar 2 09:19:47 2020 -0800

    LUCENE-8962: Add ability to selectively merge on commit (#1155)
    
    * LUCENE-8962: Add ability to selectively merge on commit
    
    This adds a new "findCommitMerges" method to MergePolicy, which can
    specify merges to be executed before the
    IndexWriter.prepareCommitInternal method returns.
    
    If we have many index writer threads, they will flush their DWPT buffers
    on commit, resulting in many small segments, which can be merged before
    the commit returns.
    
    * Add missing Javadoc
    
    * Fix incorrect comment
    
    * Refactoring and fix intermittent test failure
    
    1. Made some changes to the callback to update toCommit, leveraging
    SegmentInfos.applyMergeChanges.
    2. I realized that we'll never end up with 0 registered merges, because
    we throw an exception if we fail to register a merge.
    3. Moved the IndexWriterEvents.beginMergeOnCommit notification to before
    we call MergeScheduler.merge, since we may not be merging on another
    thread.
    4. There was an intermittent test failure due to randomness in the time
    it takes for merges to complete. Before doing the final commit, we wait
    for pending merges to finish. We may still end up abandoning the final
    merge, but we can detect that and assert that either the merge was
    abandoned (and we have > 1 segment) or we did merge down to 1 segment.
    
    * Fix typo
    
    * Fix/improve comments based on PR feedback
    
    * More comment improvements from PR feedback
    
    * Rename method and add new MergeTrigger
    
    1. Renamed findCommitMerges -> findFullFlushMerges.
    2. Added MergeTrigger.COMMIT, passed to findFullFlushMerges and to
       MergeScheduler when merging on commit.
    
    * Update renamed method name in strings and comments
---
 .../org/apache/lucene/index/FilterMergePolicy.java |   5 +
 .../java/org/apache/lucene/index/IndexWriter.java  | 114 +++++++++++++++++-
 .../org/apache/lucene/index/IndexWriterConfig.java |  21 ++++
 .../org/apache/lucene/index/IndexWriterEvents.java |  59 +++++++++
 .../apache/lucene/index/LiveIndexWriterConfig.java |  26 ++++
 .../java/org/apache/lucene/index/MergePolicy.java  |  26 +++-
 .../java/org/apache/lucene/index/MergeTrigger.java |   7 +-
 .../org/apache/lucene/index/NoMergePolicy.java     |   3 +
 .../lucene/index/OneMergeWrappingMergePolicy.java  |   5 +
 .../lucene/index/TestIndexWriterMergePolicy.java   | 132 +++++++++++++++++++++
 .../apache/lucene/index/MockRandomMergePolicy.java |  32 +++++
 .../org/apache/lucene/util/LuceneTestCase.java     |   1 +
 12 files changed, 426 insertions(+), 5 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
index eb634b4..b4e33f8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java
@@ -58,6 +58,11 @@ public class FilterMergePolicy extends MergePolicy {
   }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
+  }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
       throws IOException {
     return in.useCompoundFile(infos, mergedInfo, mergeContext);
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index bc7c00d..edebb8d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -3164,6 +3166,42 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
   }
 
+  private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
+                                                                AtomicReference<CountDownLatch> mergeLatchRef) {
+    return new MergePolicy.OneMerge(merge.segments) {
+      public void mergeFinished() throws IOException {
+        super.mergeFinished();
+        CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
+        if (mergeAwaitLatch == null) {
+          // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
+          return;
+        }
+        if (isAborted() == false) {
+          deleter.incRef(this.info.files());
+          // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
+          Set<String> mergedSegmentNames = new HashSet<>();
+          for (SegmentCommitInfo sci : this.segments) {
+            deleter.decRef(sci.files());
+            mergedSegmentNames.add(sci.info.name);
+          }
+          List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
+          for (SegmentCommitInfo sci : toCommit) {
+            if (mergedSegmentNames.contains(sci.info.name)) {
+              toCommitMergedAwaySegments.add(sci);
+            }
+          }
+          // Construct a OneMerge that applies to toCommit
+          MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
+          applicableMerge.info = this.info.clone();
+          long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
+          toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
+          toCommit.applyMergeChanges(applicableMerge, false);
+        }
+        mergeAwaitLatch.countDown();
+      }
+    };
+  }
+
   private long prepareCommitInternal() throws IOException {
     startCommitTime = System.nanoTime();
     synchronized(commitLock) {
@@ -3186,6 +3224,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       SegmentInfos toCommit = null;
       boolean anyChanges = false;
       long seqNo;
+      List<MergePolicy.OneMerge> commitMerges = null;
+      AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
 
       // This is copied from doFlush, except it's modified to
       // clone & incRef the flushed SegmentInfos inside the
@@ -3240,6 +3280,30 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // sneak into the commit point:
               toCommit = segmentInfos.clone();
 
+              if (anyChanges) {
+                // Find any merges that can execute on commit (per MergePolicy).
+                MergePolicy.MergeSpecification mergeSpec =
+                    config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
+                if (mergeSpec != null && mergeSpec.merges.size() > 0) {
+                  int mergeCount = mergeSpec.merges.size();
+                  commitMerges = new ArrayList<>(mergeCount);
+                  mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
+                  for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
+                    MergePolicy.OneMerge trackedMerge =
+                        updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
+                    if (registerMerge(trackedMerge) == false) {
+                      throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
+                          " returned merging segments from findFullFlushMerges");
+                    }
+                    commitMerges.add(trackedMerge);
+                  }
+                  if (infoStream.isEnabled("IW")) {
+                    infoStream.message("IW", "Registered " + mergeCount + " commit merges");
+                    infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
+                  }
+                }
+              }
+
               pendingCommitChangeCount = changeCount.get();
 
               // This protects the segmentInfos we are now going
@@ -3247,8 +3311,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               // we are trying to sync all referenced files, a
               // merge completes which would otherwise have
               // removed the files we are now syncing.    
-              filesToCommit = toCommit.files(false); 
-              deleter.incRef(filesToCommit);
+              deleter.incRef(toCommit.files(false));
             }
             success = true;
           } finally {
@@ -3269,6 +3332,53 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       } finally {
         maybeCloseOnTragicEvent();
       }
+
+      if (mergeAwaitLatchRef != null) {
+        CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
+        // If we found and registered any merges above, within the flushLock, then we want to ensure that they
+        // complete execution. Note that since we released the lock, other merges may have been scheduled. We will
+        // block until  the merges that we registered complete. As they complete, they will update toCommit to
+        // replace merged segments with the result of each merge.
+        config.getIndexWriterEvents().beginMergeOnCommit();
+        mergeScheduler.merge(this, MergeTrigger.COMMIT, true);
+        long mergeWaitStart = System.nanoTime();
+        int abandonedCount = 0;
+        long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
+        try {
+          if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
+            synchronized (this) {
+              // Need to do this in a synchronized block, to make sure none of our commit merges are currently
+              // executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
+              // After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
+              // usual, but when they finish, they won't attempt to update toCommit or modify segment reference
+              // counts.
+              mergeAwaitLatchRef.set(null);
+              for (MergePolicy.OneMerge commitMerge : commitMerges) {
+                if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
+                  abandonedCount++;
+                }
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          throw new IOException("Interrupted waiting for merges");
+        } finally {
+          if (infoStream.isEnabled("IW")) {
+            infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
+                (System.nanoTime() - mergeWaitStart)/1_000_000.0));
+            infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
+            if (abandonedCount > 0) {
+              infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
+            }
+          }
+          if (abandonedCount > 0) {
+            config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
+          }
+          config.getIndexWriterEvents().finishMergeOnCommit();
+        }
+      }
+      filesToCommit = toCommit.files(false);
      
       try {
         if (anyChanges) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 67d61a6..1e666ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -114,6 +114,9 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   
   /** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
   public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
+
+  /** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
+  public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
   
   // indicates whether this config instance is already attached to a writer.
   // not final so that it can be cloned properly.
@@ -486,6 +489,24 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
     return this;
   }
 
+  /**
+   * Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
+   * If this time is reached, we proceed with the commit based on segments merged up to that point.
+   * The merges are not cancelled, and may still run to completion independent of the commit.
+   */
+  public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
+    this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
+    return this;
+  }
+
+  /**
+   * Set the callback that gets invoked when IndexWriter performs various actions.
+   */
+  public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
+    this.indexWriterEvents = indexWriterEvents;
+    return this;
+  }
+
   /** We only allow sorting on these types */
   private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
                                                                                      SortField.Type.LONG,
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
new file mode 100644
index 0000000..dd4d7a2
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterEvents.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+package org.apache.lucene.index;
+
+/**
+ * Callback interface to signal various actions taken by IndexWriter.
+ *
+ * @lucene.experimental
+ */
+public interface IndexWriterEvents {
+  /**
+   * A default implementation that ignores all events.
+   */
+  IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
+    @Override
+    public void beginMergeOnCommit() { }
+
+    @Override
+    public void finishMergeOnCommit() { }
+
+    @Override
+    public void abandonedMergesOnCommit(int abandonedCount) { }
+  };
+
+  /**
+   * Signals the start of waiting for a merge on commit, returned from
+   * {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
+   */
+  void beginMergeOnCommit();
+
+  /**
+   * Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
+   * to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+   */
+  void finishMergeOnCommit();
+
+  /**
+   * Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
+   * {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
+   */
+  void abandonedMergesOnCommit(int abandonedCount);
+}
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index fbd72ec..7236b33 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -117,6 +117,12 @@ public class LiveIndexWriterConfig {
   /** the attributes for the NRT readers */
   protected Map<String, String> readerAttributes = Collections.emptyMap();
 
+  /** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
+  protected volatile double maxCommitMergeWaitSeconds;
+
+  /** Callback interface called on index writer actions. */
+  protected IndexWriterEvents indexWriterEvents;
+
 
   // used by IndexWriterConfig
   LiveIndexWriterConfig(Analyzer analyzer) {
@@ -141,6 +147,8 @@ public class LiveIndexWriterConfig {
     readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
     indexerThreadPool = new DocumentsWriterPerThreadPool();
     perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
+    maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
+    indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
   }
   
   /** Returns the default analyzer to use for indexing documents. */
@@ -480,6 +488,22 @@ public class LiveIndexWriterConfig {
     return softDeletesField;
   }
 
+  /**
+   * Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
+   * If this time is reached, we proceed with the commit based on segments merged up to that point.
+   * The merges are not cancelled, and may still run to completion independent of the commit.
+   */
+  public double getMaxCommitMergeWaitSeconds() {
+    return maxCommitMergeWaitSeconds;
+  }
+
+  /**
+   * Returns a callback used to signal actions taken by the {@link IndexWriter}.
+   */
+  public IndexWriterEvents getIndexWriterEvents() {
+    return indexWriterEvents;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -505,6 +529,8 @@ public class LiveIndexWriterConfig {
     sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
     sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
     sb.append("readerAttributes=").append(getReaderAttributes()).append("\n");
+    sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
+    sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
     return sb.toString();
   }
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index be535a7..13104fe 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -510,7 +510,7 @@ public abstract class MergePolicy {
  *          an original segment present in the
  *          to-be-merged index; else, it was a segment
  *          produced by a cascaded merge.
-   * @param mergeContext the IndexWriter to find the merges on
+   * @param mergeContext the MergeContext to find the merges on
    */
   public abstract MergeSpecification findForcedMerges(
       SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@@ -521,12 +521,34 @@ public abstract class MergePolicy {
    * deletes from the index.
    *  @param segmentInfos
    *          the total set of segments in the index
-   * @param mergeContext the IndexWriter to find the merges on
+   * @param mergeContext the MergeContext to find the merges on
    */
   public abstract MergeSpecification findForcedDeletesMerges(
       SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
 
   /**
+   * Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
+   *
+   * Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
+   * the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
+   * used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
+   * the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
+   * in the commit.
+   *
+   * If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
+   * merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
+   * Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
+   *
+   * @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
+   * @param segmentInfos the total set of segments in the index (while preparing the commit)
+   * @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
+ *                     already in a registered merge (see {@link MergeContext#getMergingSegments()}).
+   */
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return null;
+  }
+
+  /**
    * Returns true if a new segment (regardless of its origin) should use the
    * compound file format. The default implementation returns <code>true</code>
    * iff the size of the given mergedInfo is less or equal to
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
index d165a27..01a6b15 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
@@ -47,5 +47,10 @@ public enum MergeTrigger {
   /**
    * Merge was triggered by a closing IndexWriter.
    */
-  CLOSING
+  CLOSING,
+
+  /**
+   * Merge was triggered on commit.
+   */
+  COMMIT,
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
index 1480ce4..b209e8ae 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java
@@ -46,6 +46,9 @@ public final class NoMergePolicy extends MergePolicy {
   public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
     return newSegment.info.getUseCompoundFile();
   }
diff --git a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
index d08711e..a5fd66a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java
@@ -59,6 +59,11 @@ public class OneMergeWrappingMergePolicy extends FilterMergePolicy {
     return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
   }
 
+  @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
+  }
+
   private MergeSpecification wrapSpec(MergeSpecification spec) {
     MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
     if (wrapped != null) {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
index ce591a2..16d7a51 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
@@ -18,13 +18,21 @@ package org.apache.lucene.index;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 
+import org.apache.lucene.util.LineFileDocs;
 import org.apache.lucene.util.LuceneTestCase;
 
 public class TestIndexWriterMergePolicy extends LuceneTestCase {
@@ -278,6 +286,130 @@ public class TestIndexWriterMergePolicy extends LuceneTestCase {
     assertSetters(new LogDocMergePolicy());
   }
 
+  public void testMergeOnCommit() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+
+    IndexWriter firstWriter = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
+        .setMergePolicy(NoMergePolicy.INSTANCE));
+    for (int i = 0; i < 5; i++) {
+      TestIndexWriter.addDoc(firstWriter);
+      firstWriter.flush();
+    }
+    DirectoryReader firstReader = DirectoryReader.open(firstWriter);
+    assertEquals(5, firstReader.leaves().size());
+    firstReader.close();
+    firstWriter.close();
+
+    MergePolicy mergeOnCommitPolicy = new LogDocMergePolicy() {
+      @Override
+      public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
+        // Optimize down to a single segment on commit
+        if (mergeTrigger == MergeTrigger.COMMIT && segmentInfos.size() > 1) {
+          List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+          for (SegmentCommitInfo sci : segmentInfos) {
+            if (mergeContext.getMergingSegments().contains(sci) == false) {
+              nonMergingSegments.add(sci);
+            }
+          }
+          if (nonMergingSegments.size() > 1) {
+            MergeSpecification mergeSpecification = new MergeSpecification();
+            mergeSpecification.add(new OneMerge(nonMergingSegments));
+            return mergeSpecification;
+          }
+        }
+        return null;
+      }
+    };
+
+    AtomicInteger abandonedMerges = new AtomicInteger(0);
+    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()))
+        .setMergePolicy(mergeOnCommitPolicy)
+        .setIndexWriterEvents(new IndexWriterEvents() {
+          @Override
+          public void beginMergeOnCommit() {
+
+          }
+
+          @Override
+          public void finishMergeOnCommit() {
+
+          }
+
+          @Override
+          public void abandonedMergesOnCommit(int abandonedCount) {
+            abandonedMerges.incrementAndGet();
+          }
+        });
+    IndexWriter writerWithMergePolicy = new IndexWriter(dir, iwc);
+
+    writerWithMergePolicy.commit();
+
+    DirectoryReader unmergedReader = DirectoryReader.open(writerWithMergePolicy);
+    assertEquals(5, unmergedReader.leaves().size()); // Don't merge unless there's a change
+    unmergedReader.close();
+
+    TestIndexWriter.addDoc(writerWithMergePolicy);
+    writerWithMergePolicy.commit();
+
+    DirectoryReader mergedReader = DirectoryReader.open(writerWithMergePolicy);
+    assertEquals(1, mergedReader.leaves().size()); // Now we merge on commit
+    mergedReader.close();
+
+    LineFileDocs lineFileDocs = new LineFileDocs(random());
+    int docCount = atLeast(1000);
+    AtomicInteger indexedDocs = new AtomicInteger(0);
+    int numIndexingThreads = atLeast(2);
+    CountDownLatch startingGun = new CountDownLatch(1);
+    Collection<Thread> indexingThreads = new ArrayList<>();
+    for (int i = 0; i < numIndexingThreads; i++) {
+      Thread t = new Thread(() -> {
+        try {
+          while (indexedDocs.getAndIncrement() < docCount) {
+            writerWithMergePolicy.addDocument(lineFileDocs.nextDoc());
+            if (rarely()) {
+              writerWithMergePolicy.commit();
+            }
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          fail();
+        }
+      });
+      t.start();
+      indexingThreads.add(t);
+    }
+    startingGun.countDown();
+    for (Thread t : indexingThreads) {
+      t.join();
+    }
+    for (int i = 0; i < 50; i++) {
+      // Wait for pending merges to finish
+      synchronized (writerWithMergePolicy) {
+        if (writerWithMergePolicy.getMergingSegments().isEmpty()) {
+          break;
+        }
+      }
+      Thread.sleep(100);
+    }
+    abandonedMerges.set(0);
+    writerWithMergePolicy.commit();
+    if (abandonedMerges.get() == 0) {
+      assertEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
+    } else {
+      assertNotEquals(1, writerWithMergePolicy.listOfSegmentCommitInfos().size());
+    }
+
+    try (IndexReader reader = writerWithMergePolicy.getReader()) {
+      IndexSearcher searcher = new IndexSearcher(reader);
+      assertEquals(docCount + 6, reader.numDocs());
+      assertEquals(docCount + 6, searcher.count(new MatchAllDocsQuery()));
+    }
+
+    writerWithMergePolicy.close();
+
+    dir.close();
+  }
+
   private void assertSetters(MergePolicy lmp) {
     lmp.setMaxCFSSegmentSizeMB(2.0);
     assertEquals(2.0, lmp.getMaxCFSSegmentSizeMB(), EPSILON);
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
index beb4dad..92ffc73 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/MockRandomMergePolicy.java
@@ -129,6 +129,38 @@ public class MockRandomMergePolicy extends MergePolicy {
   }
 
   @Override
+  public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
+    MergeSpecification mergeSpecification = findMerges(null, segmentInfos, mergeContext);
+    if (mergeSpecification == null) {
+      return null;
+    }
+    // Do not return any merges involving already-merging segments.
+    MergeSpecification filteredMergeSpecification = new MergeSpecification();
+    for (OneMerge oneMerge : mergeSpecification.merges) {
+      boolean filtered = false;
+      List<SegmentCommitInfo> nonMergingSegments = new ArrayList<>();
+      for (SegmentCommitInfo sci : oneMerge.segments) {
+        if (mergeContext.getMergingSegments().contains(sci) == false) {
+          nonMergingSegments.add(sci);
+        } else {
+          filtered = true;
+        }
+      }
+      if (filtered == true) {
+        if (nonMergingSegments.size() > 0) {
+          filteredMergeSpecification.add(new OneMerge(nonMergingSegments));
+        }
+      } else {
+        filteredMergeSpecification.add(oneMerge);
+      }
+    }
+    if (filteredMergeSpecification.merges.size() > 0) {
+      return filteredMergeSpecification;
+    }
+    return null;
+  }
+
+  @Override
   public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) throws IOException {
     // 80% of the time we create CFS:
     return random.nextInt(5) != 1;
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index d43ffe9..638cbfc 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -1006,6 +1006,7 @@ public abstract class LuceneTestCase extends Assert {
     if (rarely(r)) {
       c.setCheckPendingFlushUpdate(false);
     }
+    c.setMaxCommitMergeWaitSeconds(atLeast(r, 1));
     return c;
   }