You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/04/24 17:13:08 UTC

[lucene-solr] branch branch_8x updated: Consolidate all IW locking inside IndexWriter (#1454)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 2e47858  Consolidate all IW locking inside IndexWriter (#1454)
2e47858 is described below

commit 2e47858bf51f1af2a33710cd71677d8887cace8c
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Fri Apr 24 19:07:21 2020 +0200

    Consolidate all IW locking inside IndexWriter (#1454)
    
    Today we still have one class that runs some tricky logic that should
    be in the IndexWriter in the first place since it requires locking on
    the IndexWriter itself. This change inverts the API and now FrozendBufferedUpdates
    does not get the IndexWriter passed in, instead the IndexWriter owns most of the logic
    and executes on a FrozenBufferedUpdates object. This prevent locking on IndexWriter out
    side of the writer itself and paves the way to simplify some concurrency down the road
---
 .../apache/lucene/index/BufferedUpdatesStream.java |   5 +-
 .../apache/lucene/index/FrozenBufferedUpdates.java | 292 ++------------------
 .../java/org/apache/lucene/index/IndexWriter.java  | 304 +++++++++++++++++++--
 3 files changed, 305 insertions(+), 296 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
index a76d624..b0aea30 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
@@ -229,7 +229,7 @@ final class BufferedUpdatesStream implements Accountable {
       // Frozen packets are now resolved, concurrently, by the indexing threads that
       // create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
       // but if we get here and the packet is not yet resolved, we resolve it now ourselves:
-      if (packet.tryApply(writer) == false) {
+      if (writer.tryApply(packet) == false) {
         // if somebody else is currently applying it - move on to the next one and force apply below
         pendingPackets.add(packet);
       }
@@ -237,7 +237,7 @@ final class BufferedUpdatesStream implements Accountable {
     }
     for (FrozenBufferedUpdates packet : pendingPackets) {
       // now block on all the packets that were concurrently applied to ensure they are due before we continue.
-      packet.forceApply(writer);
+      writer.forceApply(packet);
     }
 
     if (infoStream.isEnabled("BD")) {
@@ -345,5 +345,4 @@ final class BufferedUpdatesStream implements Accountable {
       }
     }
   }
-
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
index 3b09292..ee5e383 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
@@ -16,18 +16,14 @@
  */
 package org.apache.lucene.index;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.IntConsumer;
 
@@ -40,7 +36,6 @@ import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.RamUsageEstimator;
 
@@ -125,284 +120,39 @@ final class FrozenBufferedUpdates {
     }
   }
 
-  /** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
-   *  if the private segment was already merged away. */
-  private List<SegmentCommitInfo> getInfosToApply(IndexWriter writer) {
-    assert Thread.holdsLock(writer);
-    final List<SegmentCommitInfo> infos;
-    if (privateSegment != null) {
-      if (writer.segmentCommitInfoExist(privateSegment)) {
-        infos = Collections.singletonList(privateSegment);
-      }else {
-        if (infoStream.isEnabled("BD")) {
-          infoStream.message("BD", "private segment already gone; skip processing updates");
-        }
-        infos = null;
-      }
-    } else {
-      infos = writer.listOfSegmentCommitInfos();
-    }
-    return infos;
-  }
-
-  /** Translates a frozen packet of delete term/query, or doc values
-   *  updates, into their actual docIDs in the index, and applies the change.  This is a heavy
-   *  operation and is done concurrently by incoming indexing threads.
-   *  This method will return immediately without blocking if another thread is currently
-   *  applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)}
-   *  must be called.
-   *  */
-  @SuppressWarnings("try")
-  boolean tryApply(IndexWriter writer) throws IOException {
-    if (applyLock.tryLock()) {
-      try {
-        forceApply(writer);
-        return true;
-      } finally {
-        applyLock.unlock();
-      }
-    }
-    return false;
+  /**
+   * Tries to lock this buffered update instance
+   * @return true if the lock was successfully acquired. otherwise false.
+   */
+  boolean tryLock() {
+    return applyLock.tryLock();
   }
 
-  /** Translates a frozen packet of delete term/query, or doc values
-   *  updates, into their actual docIDs in the index, and applies the change.  This is a heavy
-   *  operation and is done concurrently by incoming indexing threads.
-   *  */
-  void forceApply(IndexWriter writer) throws IOException {
+  /**
+   * locks this buffered update instance
+   */
+  void lock() {
     applyLock.lock();
-    try {
-      if (applied.getCount() == 0) {
-        // already done
-        return;
-      }
-      long startNS = System.nanoTime();
-
-      assert any();
-
-      Set<SegmentCommitInfo> seenSegments = new HashSet<>();
-
-      int iter = 0;
-      int totalSegmentCount = 0;
-      long totalDelCount = 0;
-
-      boolean finished = false;
-
-      // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
-      // concurrent merges are running.  Once we are done, we check to see if a merge completed while we were running.  If so, we must retry
-      // resolving against the newly merged segment(s).  Eventually no merge finishes while we were running and we are done.
-      while (true) {
-        String messagePrefix;
-        if (iter == 0) {
-          messagePrefix = "";
-        } else {
-          messagePrefix = "iter " + iter;
-        }
-
-        long iterStartNS = System.nanoTime();
-
-        long mergeGenStart = writer.mergeFinishedGen.get();
-
-        Set<String> delFiles = new HashSet<>();
-        BufferedUpdatesStream.SegmentState[] segStates;
-
-        synchronized (writer) {
-          List<SegmentCommitInfo> infos = getInfosToApply(writer);
-          if (infos == null) {
-            break;
-          }
-
-          for (SegmentCommitInfo info : infos) {
-            delFiles.addAll(info.files());
-          }
-
-          // Must open while holding IW lock so that e.g. segments are not merged
-          // away, dropped from 100% deletions, etc., before we can open the readers
-          segStates = openSegmentStates(writer, infos, seenSegments, delGen());
-
-          if (segStates.length == 0) {
-
-            if (infoStream.isEnabled("BD")) {
-              infoStream.message("BD", "packet matches no segments");
-            }
-            break;
-          }
-
-          if (infoStream.isEnabled("BD")) {
-            infoStream.message("BD", String.format(Locale.ROOT,
-                messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
-                this, segStates.length, mergeGenStart));
-          }
-
-          totalSegmentCount += segStates.length;
-
-          // Important, else IFD may try to delete our files while we are still using them,
-          // if e.g. a merge finishes on some of the segments we are resolving on:
-          writer.deleter.incRef(delFiles);
-        }
-
-        AtomicBoolean success = new AtomicBoolean();
-        long delCount;
-        try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
-          assert finalizer != null; // access the finalizer to prevent a warning
-          // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
-          delCount = apply(segStates);
-          success.set(true);
-        }
-
-        // Since we just resolved some more deletes/updates, now is a good time to write them:
-        writer.writeSomeDocValuesUpdates();
-
-        // It's OK to add this here, even if the while loop retries, because delCount only includes newly
-        // deleted documents, on the segments we didn't already do in previous iterations:
-        totalDelCount += delCount;
-
-        if (infoStream.isEnabled("BD")) {
-          infoStream.message("BD", String.format(Locale.ROOT,
-              messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
-              this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
-        }
-        if (privateSegment != null) {
-          // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
-          // be applied before it kicks off, so this private segment must already not be in the set of merging segments
-
-          break;
-        }
-
-        // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
-        // in pulling all our delGens into a merge:
-        synchronized (writer) {
-          long mergeGenCur = writer.mergeFinishedGen.get();
-
-          if (mergeGenCur == mergeGenStart) {
-
-            // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
-
-            // Record that this packet is finished:
-            writer.finished(this);
-
-            finished = true;
-
-            // No merge finished while we were applying, so we are done!
-            break;
-          }
-        }
-
-        if (infoStream.isEnabled("BD")) {
-          infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
-        }
-
-        // A merge completed while we were running.  In this case, that merge may have picked up some of the updates we did, but not
-        // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
-
-        iter++;
-      }
-
-      if (finished == false) {
-        // Record that this packet is finished:
-        writer.finished(this);
-      }
-
-      if (infoStream.isEnabled("BD")) {
-        String message = String.format(Locale.ROOT,
-            "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
-            this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
-        if (iter > 0) {
-          message += "; " + (iter + 1) + " iters due to concurrent merges";
-        }
-        message += "; " + writer.getPendingUpdatesCount() + " packets remain";
-        infoStream.message("BD", message);
-      }
-    } finally {
-      applyLock.unlock();
-    }
   }
 
-  /** Opens SegmentReader and inits SegmentState for each segment. */
-  private static BufferedUpdatesStream.SegmentState[] openSegmentStates(IndexWriter writer, List<SegmentCommitInfo> infos,
-                                                                       Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
-    List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
-    try {
-      for (SegmentCommitInfo info : infos) {
-        if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
-          segStates.add(new BufferedUpdatesStream.SegmentState(writer.getPooledInstance(info, true), writer::release, info));
-          alreadySeenSegments.add(info);
-        }
-      }
-    } catch (Throwable t) {
-      try {
-        IOUtils.close(segStates);
-      } catch (Throwable t1) {
-        t.addSuppressed(t1);
-      }
-      throw t;
-    }
-
-    return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
-  }
-
-  /** Close segment states previously opened with openSegmentStates. */
-  public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
-    List<SegmentCommitInfo> allDeleted = null;
-    long totDelCount = 0;
-    try {
-      for (BufferedUpdatesStream.SegmentState segState : segStates) {
-        if (success) {
-          totDelCount += segState.rld.getDelCount() - segState.startDelCount;
-          int fullDelCount = segState.rld.getDelCount();
-          assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
-          if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
-            if (allDeleted == null) {
-              allDeleted = new ArrayList<>();
-            }
-            allDeleted.add(segState.reader.getOriginalSegmentInfo());
-          }
-        }
-      }
-    } finally {
-      IOUtils.close(segStates);
-    }
-    if (writer.infoStream.isEnabled("BD")) {
-      writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
-    }
-
-    return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
+  /**
+   * Releases the lock of this buffered update instance
+   */
+  void unlock() {
+    applyLock.unlock();
   }
 
-  private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
-                           boolean success, Set<String> delFiles) throws IOException {
+  /**
+   * Returns true iff this buffered updates instance was already applied
+   */
+  boolean isApplied() {
     assert applyLock.isHeldByCurrentThread();
-    synchronized (writer) {
-
-      BufferedUpdatesStream.ApplyDeletesResult result;
-      try {
-        result = closeSegmentStates(writer, segStates, success);
-      } finally {
-        // Matches the incRef we did above, but we must do the decRef after closing segment states else
-        // IFD can't delete still-open files
-        writer.deleter.decRef(delFiles);
-      }
-
-      if (result.anyDeletes) {
-          writer.maybeMerge.set(true);
-          writer.checkpoint();
-      }
-
-      if (result.allDeleted != null) {
-        if (infoStream.isEnabled("IW")) {
-          infoStream.message("IW", "drop 100% deleted segments: " + writer.segString(result.allDeleted));
-        }
-        for (SegmentCommitInfo info : result.allDeleted) {
-          writer.dropDeletedSegment(info);
-        }
-        writer.checkpoint();
-      }
-    }
+    return applied.getCount() == 0;
   }
 
   /** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
    *  the number of new deleted or updated documents. */
-  private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
+  long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
     assert applyLock.isHeldByCurrentThread();
     if (delGen == -1) {
       // we were not yet pushed
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 35eb5ef..7ff697a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -385,7 +385,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   private volatile boolean closed;
   private volatile boolean closing;
 
-  final AtomicBoolean maybeMerge = new AtomicBoolean();
+  private final AtomicBoolean maybeMerge = new AtomicBoolean();
 
   private Iterable<Map.Entry<String,String>> commitUserData;
 
@@ -409,9 +409,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   private final ReaderPool readerPool;
   final BufferedUpdatesStream bufferedUpdatesStream;
 
-  /** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#forceApply(IndexWriter)}
+  /** Counts how many merges have completed; this is used by {@link #forceApply(FrozenBufferedUpdates)}
    *  to handle concurrently apply deletes/updates with merges completing. */
-  final AtomicLong mergeFinishedGen = new AtomicLong();
+  private final AtomicLong mergeFinishedGen = new AtomicLong();
 
   // The instance that was passed to the constructor. It is saved only in order
   // to allow users to query an IndexWriter settings.
@@ -651,13 +651,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     return docWriter.getFlushingBytes();
   }
 
-  final long getReaderPoolRamBytesUsed() {
-    return readerPool.ramBytesUsed();
-  }
 
   private final AtomicBoolean writeDocValuesLock = new AtomicBoolean();
 
-  void writeSomeDocValuesUpdates() throws IOException {
+  final void writeSomeDocValuesUpdates() throws IOException {
     if (writeDocValuesLock.compareAndSet(false, true)) {
       try {
         final double ramBufferSizeMB = config.getRAMBufferSizeMB();
@@ -665,7 +662,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
         if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
           long startNS = System.nanoTime();
 
-          long ramBytesUsed = getReaderPoolRamBytesUsed();
+          long ramBytesUsed = readerPool.ramBytesUsed();
           if (ramBytesUsed > 0.5 * ramBufferSizeMB * 1024 * 1024) {
             if (infoStream.isEnabled("BD")) {
               infoStream.message("BD", String.format(Locale.ROOT, "now write some pending DV updates: %.2f MB used vs IWC Buffer %.2f MB",
@@ -707,7 +704,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
             if (infoStream.isEnabled("BD")) {
               infoStream.message("BD", String.format(Locale.ROOT, "done write some DV updates for %d segments: now %.2f MB used vs IWC Buffer %.2f MB; took %.2f sec",
-                  count, getReaderPoolRamBytesUsed()/1024./1024., ramBufferSizeMB, ((System.nanoTime() - startNS)/1000000000.)));
+                  count, readerPool.ramBytesUsed()/1024./1024., ramBufferSizeMB, ((System.nanoTime() - startNS)/1000000000.)));
             }
           }
         }
@@ -1547,7 +1544,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   }
 
   /** Drops a segment that has 100% deleted documents. */
-  synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
+  private synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
     // If a merge has already registered for this
     // segment, we leave it in the readerPool; the
     // merge will skip merging it and will then drop
@@ -1890,7 +1887,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
   /** If enabled, information about merges will be printed to this.
    */
-  final InfoStream infoStream;
+  private final InfoStream infoStream;
 
   /**
    * Forces merge policy to merge segments until there are
@@ -2579,7 +2576,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
    * the index files referenced exist (correctly) in the
    * index directory.
    */
-  synchronized void checkpoint() throws IOException {
+  private synchronized void checkpoint() throws IOException {
     changed();
     deleter.checkpoint(segmentInfos, false);
   }
@@ -2607,7 +2604,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
       try {
         // we call tryApply here since we don't want to block if a refresh or a flush is already applying the
         // packet. The flush will retry this packet anyway to ensure all of them are applied
-        packet.tryApply(w);
+        tryApply(packet);
       } catch (Throwable t) {
         try {
           w.onTragicEvent(t, "applyUpdatesPacket");
@@ -5275,12 +5272,280 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     return readerPool.get(info, create);
   }
 
-  void finished(FrozenBufferedUpdates packet) {
-    bufferedUpdatesStream.finished(packet);
+// FrozenBufferedUpdates
+  /**
+   * Translates a frozen packet of delete term/query, or doc values
+   * updates, into their actual docIDs in the index, and applies the change.  This is a heavy
+   * operation and is done concurrently by incoming indexing threads.
+   * This method will return immediately without blocking if another thread is currently
+   * applying the package. In order to ensure the packet has been applied,
+   * {@link IndexWriter#forceApply(FrozenBufferedUpdates)} must be called.
+   */
+  @SuppressWarnings("try")
+  boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
+    if (updates.tryLock()) {
+      try {
+        forceApply(updates);
+        return true;
+      } finally {
+        updates.unlock();
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Translates a frozen packet of delete term/query, or doc values
+   * updates, into their actual docIDs in the index, and applies the change.  This is a heavy
+   * operation and is done concurrently by incoming indexing threads.
+   */
+  void forceApply(FrozenBufferedUpdates updates) throws IOException {
+    updates.lock();
+    try {
+      if (updates.isApplied()) {
+        // already done
+        return;
+      }
+      long startNS = System.nanoTime();
+
+      assert updates.any();
+
+      Set<SegmentCommitInfo> seenSegments = new HashSet<>();
+
+      int iter = 0;
+      int totalSegmentCount = 0;
+      long totalDelCount = 0;
+
+      boolean finished = false;
+
+      // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
+      // concurrent merges are running.  Once we are done, we check to see if a merge completed while we were running.  If so, we must retry
+      // resolving against the newly merged segment(s).  Eventually no merge finishes while we were running and we are done.
+      while (true) {
+        String messagePrefix;
+        if (iter == 0) {
+          messagePrefix = "";
+        } else {
+          messagePrefix = "iter " + iter;
+        }
+
+        long iterStartNS = System.nanoTime();
+
+        long mergeGenStart = mergeFinishedGen.get();
+
+        Set<String> delFiles = new HashSet<>();
+        BufferedUpdatesStream.SegmentState[] segStates;
+
+        synchronized (this) {
+          List<SegmentCommitInfo> infos = getInfosToApply(updates);
+          if (infos == null) {
+            break;
+          }
+
+          for (SegmentCommitInfo info : infos) {
+            delFiles.addAll(info.files());
+          }
+
+          // Must open while holding IW lock so that e.g. segments are not merged
+          // away, dropped from 100% deletions, etc., before we can open the readers
+          segStates = openSegmentStates(infos, seenSegments, updates.delGen());
+
+          if (segStates.length == 0) {
+
+            if (infoStream.isEnabled("BD")) {
+              infoStream.message("BD", "packet matches no segments");
+            }
+            break;
+          }
+
+          if (infoStream.isEnabled("BD")) {
+            infoStream.message("BD", String.format(Locale.ROOT,
+                messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
+                this, segStates.length, mergeGenStart));
+          }
+
+          totalSegmentCount += segStates.length;
+
+          // Important, else IFD may try to delete our files while we are still using them,
+          // if e.g. a merge finishes on some of the segments we are resolving on:
+          deleter.incRef(delFiles);
+        }
+
+        AtomicBoolean success = new AtomicBoolean();
+        long delCount;
+        try (Closeable finalizer = () -> finishApply(segStates, success.get(), delFiles)) {
+          assert finalizer != null; // access the finalizer to prevent a warning
+          // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
+          delCount = updates.apply(segStates);
+          success.set(true);
+        }
+
+        // Since we just resolved some more deletes/updates, now is a good time to write them:
+        writeSomeDocValuesUpdates();
+
+        // It's OK to add this here, even if the while loop retries, because delCount only includes newly
+        // deleted documents, on the segments we didn't already do in previous iterations:
+        totalDelCount += delCount;
+
+        if (infoStream.isEnabled("BD")) {
+          infoStream.message("BD", String.format(Locale.ROOT,
+              messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
+              this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
+        }
+        if (updates.privateSegment != null) {
+          // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
+          // be applied before it kicks off, so this private segment must already not be in the set of merging segments
+
+          break;
+        }
+
+        // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
+        // in pulling all our delGens into a merge:
+        synchronized (this) {
+          long mergeGenCur = mergeFinishedGen.get();
+
+          if (mergeGenCur == mergeGenStart) {
+
+            // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
+
+            // Record that this packet is finished:
+            bufferedUpdatesStream.finished(updates);
+
+            finished = true;
+
+            // No merge finished while we were applying, so we are done!
+            break;
+          }
+        }
+
+        if (infoStream.isEnabled("BD")) {
+          infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
+        }
+
+        // A merge completed while we were running.  In this case, that merge may have picked up some of the updates we did, but not
+        // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
+
+        iter++;
+      }
+
+      if (finished == false) {
+        // Record that this packet is finished:
+        bufferedUpdatesStream.finished(updates);
+      }
+
+      if (infoStream.isEnabled("BD")) {
+        String message = String.format(Locale.ROOT,
+            "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
+            this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
+        if (iter > 0) {
+          message += "; " + (iter + 1) + " iters due to concurrent merges";
+        }
+        message += "; " + bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain";
+        infoStream.message("BD", message);
+      }
+    } finally {
+      updates.unlock();
+    }
+  }
+
+  /** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
+   *  if the private segment was already merged away. */
+  private synchronized List<SegmentCommitInfo> getInfosToApply(FrozenBufferedUpdates updates) {
+    final List<SegmentCommitInfo> infos;
+    if (updates.privateSegment != null) {
+      if (segmentInfos.contains(updates.privateSegment)) {
+        infos = Collections.singletonList(updates.privateSegment);
+      }else {
+        if (infoStream.isEnabled("BD")) {
+          infoStream.message("BD", "private segment already gone; skip processing updates");
+        }
+        infos = null;
+      }
+    } else {
+      infos = listOfSegmentCommitInfos();
+    }
+    return infos;
+  }
+
+  private void finishApply(BufferedUpdatesStream.SegmentState[] segStates,
+                           boolean success, Set<String> delFiles) throws IOException {
+    synchronized (this) {
+
+      BufferedUpdatesStream.ApplyDeletesResult result;
+      try {
+        result = closeSegmentStates(segStates, success);
+      } finally {
+        // Matches the incRef we did above, but we must do the decRef after closing segment states else
+        // IFD can't delete still-open files
+        deleter.decRef(delFiles);
+      }
+
+      if (result.anyDeletes) {
+        maybeMerge.set(true);
+        checkpoint();
+      }
+
+      if (result.allDeleted != null) {
+        if (infoStream.isEnabled("IW")) {
+          infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
+        }
+        for (SegmentCommitInfo info : result.allDeleted) {
+          dropDeletedSegment(info);
+        }
+        checkpoint();
+      }
+    }
+  }
+
+  /** Close segment states previously opened with openSegmentStates. */
+  private BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
+    List<SegmentCommitInfo> allDeleted = null;
+    long totDelCount = 0;
+    try {
+      for (BufferedUpdatesStream.SegmentState segState : segStates) {
+        if (success) {
+          totDelCount += segState.rld.getDelCount() - segState.startDelCount;
+          int fullDelCount = segState.rld.getDelCount();
+          assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
+          if (segState.rld.isFullyDeleted() && getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
+            if (allDeleted == null) {
+              allDeleted = new ArrayList<>();
+            }
+            allDeleted.add(segState.reader.getOriginalSegmentInfo());
+          }
+        }
+      }
+    } finally {
+      IOUtils.close(segStates);
+    }
+    if (infoStream.isEnabled("BD")) {
+      infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + bufferedUpdatesStream.getPendingUpdatesCount() +  " packets; bytesUsed=" + readerPool.ramBytesUsed());
+    }
+
+    return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
   }
 
-  int getPendingUpdatesCount() {
-    return bufferedUpdatesStream.getPendingUpdatesCount();
+  /** Opens SegmentReader and inits SegmentState for each segment. */
+  private BufferedUpdatesStream.SegmentState[] openSegmentStates(List<SegmentCommitInfo> infos,
+                                                                 Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
+    List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
+    try {
+      for (SegmentCommitInfo info : infos) {
+        if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
+          segStates.add(new BufferedUpdatesStream.SegmentState(getPooledInstance(info, true), this::release, info));
+          alreadySeenSegments.add(info);
+        }
+      }
+    } catch (Throwable t) {
+      try {
+        IOUtils.close(segStates);
+      } catch (Throwable t1) {
+        t.addSuppressed(t1);
+      }
+      throw t;
+    }
+
+    return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
   }
 
   /**
@@ -5296,11 +5561,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     }
   }
 
-  /** Checks if the provided segment exists in the current segmentInfos */
-  final synchronized boolean segmentCommitInfoExist(SegmentCommitInfo sci) {
-    return segmentInfos.contains(sci);
-  }
-
   /** Returns an unmodifiable view of the list of all segments of the current segmentInfos */
   final synchronized List<SegmentCommitInfo> listOfSegmentCommitInfos() {
     return segmentInfos.asList();