You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2018/04/24 13:50:14 UTC

lucene-solr:branch_7x: LUCENE-8271: Remove IndexWriter from DWFlushQueue

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 740c49669 -> e018ff555


LUCENE-8271: Remove IndexWriter from DWFlushQueue

This simplifies DocumentsWriterFlushQueue by moving all IW related
code out of it. The DWFQ now only contains logic for taking tickets
off the queue and applying it to a given consumer. The logic now
entirely resides in IW and has private visibility. Locking
also is more contained since IW knows exactly what is called and when.

Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e018ff55
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e018ff55
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e018ff55

Branch: refs/heads/branch_7x
Commit: e018ff5554229f17e49bf37b629811d183c9f856
Parents: 740c496
Author: Simon Willnauer <si...@apache.org>
Authored: Tue Apr 24 15:40:48 2018 +0200
Committer: Simon Willnauer <si...@apache.org>
Committed: Tue Apr 24 15:43:08 2018 +0200

----------------------------------------------------------------------
 .../apache/lucene/index/DocumentsWriter.java    |  41 +++--
 .../lucene/index/DocumentsWriterFlushQueue.java | 180 ++++++-------------
 .../lucene/index/DocumentsWriterPerThread.java  |   5 +-
 .../org/apache/lucene/index/IndexWriter.java    | 133 ++++++++------
 4 files changed, 155 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e018ff55/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 5e7cdce..cbf4d6d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -31,13 +31,13 @@ import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 
 /**
@@ -190,12 +190,13 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
     return false;
   }
-  
-  int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
+
+  void purgeFlushTickets(boolean forced, IOUtils.IOConsumer<DocumentsWriterFlushQueue.FlushTicket> consumer)
+      throws IOException {
     if (forced) {
-      return ticketQueue.forcePurge(writer);
+      ticketQueue.forcePurge(consumer);
     } else {
-      return ticketQueue.tryPurge(writer);
+      ticketQueue.tryPurge(consumer);
     }
   }
 
@@ -206,7 +207,7 @@ final class DocumentsWriter implements Closeable, Accountable {
 
   private void ensureOpen() throws AlreadyClosedException {
     if (closed) {
-      throw new AlreadyClosedException("this IndexWriter is closed");
+      throw new AlreadyClosedException("this DocumentsWriter is closed");
     }
   }
 
@@ -214,8 +215,7 @@ final class DocumentsWriter implements Closeable, Accountable {
    *  updating the index files) and must discard all
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
-  synchronized void abort(IndexWriter writer) throws IOException {
-    assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
+  synchronized void abort() throws IOException {
     boolean success = false;
     try {
       deleteQueue.clear();
@@ -260,17 +260,19 @@ final class DocumentsWriter implements Closeable, Accountable {
   /** Locks all currently active DWPT and aborts them.
    *  The returned Closeable should be closed once the locks for the aborted
    *  DWPTs can be released. */
-  synchronized Closeable lockAndAbortAll(IndexWriter indexWriter) throws IOException {
-    assert indexWriter.holdsFullFlushLock();
+  synchronized Closeable lockAndAbortAll() throws IOException {
     if (infoStream.isEnabled("DW")) {
       infoStream.message("DW", "lockAndAbortAll");
     }
     // Make sure we move all pending tickets into the flush queue:
-    ticketQueue.forcePurge(indexWriter);
+    ticketQueue.forcePurge(ticket -> {
+      if (ticket.getFlushedSegment() != null) {
+        pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc());
+      }
+    });
     List<ThreadState> threadStates = new ArrayList<>();
     AtomicBoolean released = new AtomicBoolean(false);
     final Closeable release = () -> {
-      assert indexWriter.holdsFullFlushLock();
       if (released.compareAndSet(false, true)) { // only once
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", "unlockAllAbortedThread");
@@ -519,7 +521,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     while (flushingDWPT != null) {
       hasEvents = true;
       boolean success = false;
-      SegmentFlushTicket ticket = null;
+      DocumentsWriterFlushQueue.FlushTicket ticket = null;
       try {
         assert currentFullFlushDelQueue == null
             || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@@ -618,7 +620,7 @@ final class DocumentsWriter implements Closeable, Accountable {
   interface FlushNotifications { // TODO maybe we find a better name for this?
 
     /**
-     * Called when files were written to disk that are not used anymore. It's the implementations responsibilty
+     * Called when files were written to disk that are not used anymore. It's the implementation's responsibility
      * to clean these files up
      */
     void deleteUnusedFiles(Collection<String> files);
@@ -648,9 +650,9 @@ final class DocumentsWriter implements Closeable, Accountable {
      * that tries to publish flushed segments but can't keep up with the other threads flushing new segments.
      * This likely requires other thread to forcefully purge the buffer to help publishing. This
      * can't be done in-place since we might hold index writer locks when this is called. The caller must ensure
-     * that the purge happens without an index writer lock hold
+     * that the purge happens without an index writer lock being held.
      *
-     * @see DocumentsWriter#purgeBuffer(IndexWriter, boolean)
+     * @see DocumentsWriter#purgeFlushTickets(boolean, IOUtils.IOConsumer)
      */
     void onTicketBacklog();
   }
@@ -677,7 +679,7 @@ final class DocumentsWriter implements Closeable, Accountable {
    * two stage operation; the caller must ensure (in try/finally) that finishFlush
    * is called after this method, to release the flush lock in DWFlushControl
    */
-  long flushAllThreads(IndexWriter writer)
+  long flushAllThreads()
     throws IOException {
     final DocumentsWriterDeleteQueue flushingDeleteQueue;
     if (infoStream.isEnabled("DW")) {
@@ -713,7 +715,6 @@ final class DocumentsWriter implements Closeable, Accountable {
         }
         ticketQueue.addDeletes(flushingDeleteQueue);
       }
-      ticketQueue.forcePurge(writer);
       // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
       // concurrently if we have very small ram buffers this happens quite frequently
       assert !flushingDeleteQueue.anyChanges();
@@ -727,8 +728,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
   }
   
-  void finishFullFlush(IndexWriter indexWriter, boolean success) {
-    assert indexWriter.holdsFullFlushLock();
+  void finishFullFlush(boolean success) {
     try {
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);
@@ -739,7 +739,6 @@ final class DocumentsWriter implements Closeable, Accountable {
         flushControl.finishFullFlush();
       } else {
         flushControl.abortFullFlushes();
-
       }
     } finally {
       pendingChangesInCurrentFullFlush = false;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e018ff55/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
index fde7587..2f94fd0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
@@ -23,29 +23,28 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+import org.apache.lucene.util.IOUtils;
 
 /**
  * @lucene.internal 
  */
-class DocumentsWriterFlushQueue {
+final class DocumentsWriterFlushQueue {
   private final Queue<FlushTicket> queue = new LinkedList<>();
   // we track tickets separately since count must be present even before the ticket is
   // constructed ie. queue.size would not reflect it.
   private final AtomicInteger ticketCount = new AtomicInteger();
   private final ReentrantLock purgeLock = new ReentrantLock();
 
-  void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
-    synchronized (this) {
-      incTickets();// first inc the ticket count - freeze opens
-                   // a window for #anyChanges to fail
-      boolean success = false;
-      try {
-        queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
-        success = true;
-      } finally {
-        if (!success) {
-          decTickets();
-        }
+  synchronized void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+    incTickets();// first inc the ticket count - freeze opens
+                 // a window for #anyChanges to fail
+    boolean success = false;
+    try {
+      queue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
+      success = true;
+    } finally {
+      if (!success) {
+        decTickets();
       }
     }
   }
@@ -60,14 +59,14 @@ class DocumentsWriterFlushQueue {
     assert numTickets >= 0;
   }
 
-  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
+  synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
     // Each flush is assigned a ticket in the order they acquire the ticketQueue
     // lock
     incTickets();
     boolean success = false;
     try {
       // prepare flush freezes the global deletes - do in synced block!
-      final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
+      final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
       queue.add(ticket);
       success = true;
       return ticket;
@@ -78,13 +77,15 @@ class DocumentsWriterFlushQueue {
     }
   }
   
-  synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) {
+  synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
+    assert ticket.hasSegment;
     // the actual flush is done asynchronously and once done the FlushedSegment
     // is passed to the flush ticket
     ticket.setSegment(segment);
   }
 
-  synchronized void markTicketFailed(SegmentFlushTicket ticket) {
+  synchronized void markTicketFailed(FlushTicket ticket) {
+    assert ticket.hasSegment;
     // to free the queue we mark tickets as failed just to clean up the queue.
     ticket.setFailed();
   }
@@ -94,9 +95,8 @@ class DocumentsWriterFlushQueue {
     return ticketCount.get() != 0;
   }
 
-  private int innerPurge(IndexWriter writer) throws IOException {
+  private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
     assert purgeLock.isHeldByCurrentThread();
-    int numPurged = 0;
     while (true) {
       final FlushTicket head;
       final boolean canPublish;
@@ -105,167 +105,99 @@ class DocumentsWriterFlushQueue {
         canPublish = head != null && head.canPublish(); // do this synced 
       }
       if (canPublish) {
-        numPurged++;
         try {
           /*
            * if we block on publish -> lock IW -> lock BufferedDeletes we don't block
            * concurrent segment flushes just because they want to append to the queue.
-           * the downside is that we need to force a purge on fullFlush since ther could
+           * the downside is that we need to force a purge on fullFlush since there could
            * be a ticket still in the queue. 
            */
-          head.publish(writer);
-          
+          consumer.accept(head);
+
         } finally {
           synchronized (this) {
             // finally remove the published ticket from the queue
             final FlushTicket poll = queue.poll();
-
+            decTickets();
             // we hold the purgeLock so no other thread should have polled:
             assert poll == head;
-            
-            ticketCount.decrementAndGet();
-            assert poll == head;
           }
         }
       } else {
         break;
       }
     }
-    return numPurged;
   }
 
-  int forcePurge(IndexWriter writer) throws IOException {
+  void forcePurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
     assert !Thread.holdsLock(this);
-    assert !Thread.holdsLock(writer);
     purgeLock.lock();
     try {
-      return innerPurge(writer);
+      innerPurge(consumer);
     } finally {
       purgeLock.unlock();
     }
   }
 
-  int tryPurge(IndexWriter writer) throws IOException {
+  void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
     assert !Thread.holdsLock(this);
-    assert !Thread.holdsLock(writer);
     if (purgeLock.tryLock()) {
       try {
-        return innerPurge(writer);
+        innerPurge(consumer);
       } finally {
         purgeLock.unlock();
       }
     }
-    return 0;
   }
 
-  public int getTicketCount() {
+  int getTicketCount() {
     return ticketCount.get();
   }
 
-  synchronized void clear() {
-    queue.clear();
-    ticketCount.set(0);
-  }
-
-  static abstract class FlushTicket {
-    protected FrozenBufferedUpdates frozenUpdates;
-    protected boolean published = false;
+  static final class FlushTicket {
+    private final FrozenBufferedUpdates frozenUpdates;
+    private final boolean hasSegment;
+    private FlushedSegment segment;
+    private boolean failed = false;
+    private boolean published = false;
 
-    protected FlushTicket(FrozenBufferedUpdates frozenUpdates) {
+    FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
       this.frozenUpdates = frozenUpdates;
+      this.hasSegment = hasSegment;
     }
 
-    protected abstract void publish(IndexWriter writer) throws IOException;
-
-    protected abstract boolean canPublish();
-    
-    /**
-     * Publishes the flushed segment, segment private deletes (if any) and its
-     * associated global delete (if present) to IndexWriter.  The actual
-     * publishing operation is synced on {@code IW -> BDS} so that the {@link SegmentInfo}'s
-     * delete generation is always GlobalPacket_deleteGeneration + 1
-     */
-    protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates globalPacket)
-        throws IOException {
-      assert newSegment != null;
-      SegmentCommitInfo segmentInfo = newSegment.segmentInfo;
-      assert segmentInfo != null;
-      final FrozenBufferedUpdates segmentUpdates = newSegment.segmentUpdates;
-      if (indexWriter.infoStream.isEnabled("DW")) {
-        indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);  
-      }
-      
-      if (segmentUpdates != null && indexWriter.infoStream.isEnabled("DW")) {
-        indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
-      }
-      // now publish!
-      indexWriter.publishFlushedSegment(segmentInfo, newSegment.fieldInfos, segmentUpdates, globalPacket, newSegment.sortMap);
+    boolean canPublish() {
+      return hasSegment == false || segment != null || failed;
     }
-    
-    protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates)
-            throws IOException {
-      // Finish the flushed segment and publish it to IndexWriter
-      if (newSegment == null) {
-        if (bufferedUpdates != null && bufferedUpdates.any()) {
-          indexWriter.publishFrozenUpdates(bufferedUpdates);
-          if (indexWriter.infoStream.isEnabled("DW")) {
-            indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
-          }
-        }
-      } else {
-        publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);  
-      }
-    }
-  }
-  
-  static final class GlobalDeletesTicket extends FlushTicket {
 
-    protected GlobalDeletesTicket(FrozenBufferedUpdates frozenUpdates) {
-      super(frozenUpdates);
-    }
-
-    @Override
-    protected void publish(IndexWriter writer) throws IOException {
-      assert !published : "ticket was already publised - can not publish twice";
+    synchronized void markPublished() {
+      assert published == false: "ticket was already published - can not publish twice";
       published = true;
-      // it's a global ticket - no segment to publish
-      finishFlush(writer, null, frozenUpdates);
     }
 
-    @Override
-    protected boolean canPublish() {
-      return true;
-    }
-  }
-
-  static final class SegmentFlushTicket extends FlushTicket {
-    private FlushedSegment segment;
-    private boolean failed = false;
-    
-    protected SegmentFlushTicket(FrozenBufferedUpdates frozenDeletes) {
-      super(frozenDeletes);
-    }
-    
-    @Override
-    protected void publish(IndexWriter writer) throws IOException {
-      assert !published : "ticket was already publised - can not publish twice";
-      published = true;
-      finishFlush(writer, segment, frozenUpdates);
-    }
-    
-    protected void setSegment(FlushedSegment segment) {
+    private void setSegment(FlushedSegment segment) {
       assert !failed;
       this.segment = segment;
     }
-    
-    protected void setFailed() {
+
+    private void setFailed() {
       assert segment == null;
       failed = true;
     }
 
-    @Override
-    protected boolean canPublish() {
-      return segment != null || failed;
+    /**
+     * Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the
+     * case if this ticket represents a flushed global frozen updates package.
+     */
+    FlushedSegment getFlushedSegment() {
+      return segment;
+    }
+
+    /**
+     * Returns a frozen global deletes package.
+     */
+    FrozenBufferedUpdates getFrozenUpdates() {
+      return frozenUpdates;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e018ff55/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 04ab493..da45875 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -46,7 +46,7 @@ import org.apache.lucene.util.Version;
 import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
 import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
 
-class DocumentsWriterPerThread {
+final class DocumentsWriterPerThread {
 
   /**
    * The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method
@@ -102,7 +102,7 @@ class DocumentsWriterPerThread {
     }
   }
 
-  static class FlushedSegment {
+  static final class FlushedSegment {
     final SegmentCommitInfo segmentInfo;
     final FieldInfos fieldInfos;
     final FrozenBufferedUpdates segmentUpdates;
@@ -152,7 +152,6 @@ class DocumentsWriterPerThread {
   final DocConsumer consumer;
   final Counter bytesUsed;
   
-  SegmentWriteState flushState;
   // Updates for our still-in-RAM (to be flushed next) segment
   final BufferedUpdates pendingUpdates;
   final SegmentInfo segmentInfo;     // Current segment we are working on

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e018ff55/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 5e17e1a..7c1bb20 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -274,7 +274,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   final AtomicReference<Throwable> tragedy = new AtomicReference<>(null);
 
   private final Directory directoryOrig;       // original user directory
-  final Directory directory;           // wrapped with additional checks
+  private final Directory directory;           // wrapped with additional checks
   private final Analyzer analyzer;    // how to analyze text
 
   private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@@ -360,7 +360,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     @Override
     public void afterSegmentsFlushed() throws IOException {
       try {
-        purge(false);
+        publishFlushedSegments(false);
       } finally {
         if (false) {
           maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
@@ -377,7 +377,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     public void onDeletesApplied() {
       eventQueue.add(w -> {
           try {
-            w.purge(true);
+            w.publishFlushedSegments(true);
           } finally {
             flushCount.incrementAndGet();
           }
@@ -387,7 +387,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
     @Override
     public void onTicketBacklog() {
-      eventQueue.add(w -> w.purge(true));
+      eventQueue.add(w -> w.publishFlushedSegments(true));
     }
   };
 
@@ -485,7 +485,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       synchronized (fullFlushLock) {
         try {
           // TODO: should we somehow make this available in the returned NRT reader?
-          long seqNo = docWriter.flushAllThreads(this);
+          long seqNo = docWriter.flushAllThreads();
           if (seqNo < 0) {
             anyChanges = true;
             seqNo = -seqNo;
@@ -497,7 +497,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             // if we flushed anything.
             flushCount.incrementAndGet();
           }
-
+          publishFlushedSegments(true);
           processEvents(false);
 
           if (applyAllDeletes) {
@@ -534,7 +534,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           success = true;
         } finally {
           // Done: finish the full flush!
-          docWriter.finishFullFlush(this, success);
+          assert holdsFullFlushLock();
+          docWriter.finishFullFlush(success);
           if (success) {
             processEvents(false);
             doAfterFlush();
@@ -2209,10 +2210,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // set it to false before calling rollbackInternal
       mergeScheduler.close();
 
-      docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes 
-      docWriter.abort(this); // don't sync on IW here
+      docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
+      assert !Thread.holdsLock(this) : "IndexWriter lock should never be hold when aborting";
+      docWriter.abort(); // don't sync on IW here
       docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
-      purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
+      publishFlushedSegments(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
       synchronized (this) {
 
         if (pendingCommit != null) {
@@ -2354,7 +2356,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
      */
     try {
       synchronized (fullFlushLock) {
-        try (Closeable finalizer = docWriter.lockAndAbortAll(this)) {
+        try (Closeable finalizer = docWriter.lockAndAbortAll()) {
           processEvents(false);
           synchronized (this) {
             try {
@@ -2501,19 +2503,33 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     segmentInfos.changed();
   }
 
-  synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
+  synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
     assert packet != null && packet.any();
-    bufferedUpdatesStream.push(packet);
-    eventQueue.add(new ResolveUpdatesEvent(packet));
+    long nextGen = bufferedUpdatesStream.push(packet);
+    // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
+    eventQueue.add(w -> {
+      try {
+        packet.apply(w);
+      } catch (Throwable t) {
+        try {
+          w.onTragicEvent(t, "applyUpdatesPacket");
+        } catch (Throwable t1) {
+          t.addSuppressed(t1);
+        }
+        throw t;
+      }
+      w.flushDeletesCount.incrementAndGet();
+    });
+    return nextGen;
   }
 
   /**
    * Atomically adds the segment private delete packet and publishes the flushed
    * segments SegmentInfo to the index writer.
    */
-  synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
-                                          FieldInfos fieldInfos, FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
-                                          Sorter.DocMap sortMap) throws IOException {
+  private synchronized void publishFlushedSegment(SegmentCommitInfo newSegment, FieldInfos fieldInfos,
+                                                  FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
+                                                  Sorter.DocMap sortMap) throws IOException {
     boolean published = false;
     try {
       // Lock order IW -> BDS
@@ -2524,20 +2540,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       }
 
       if (globalPacket != null && globalPacket.any()) {
-        // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
-        bufferedUpdatesStream.push(globalPacket);
-        eventQueue.add(new ResolveUpdatesEvent(globalPacket));
+        publishFrozenUpdates(globalPacket);
       }
 
       // Publishing the segment must be sync'd on IW -> BDS to make the sure
       // that no merge prunes away the seg. private delete packet
       final long nextGen;
       if (packet != null && packet.any()) {
-        nextGen = bufferedUpdatesStream.push(packet);
-
-        // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
-        eventQueue.add(new ResolveUpdatesEvent(packet));
-
+        nextGen = publishFrozenUpdates(packet);
       } else {
         // Since we don't have a delete packet to apply we can get a new
         // generation right away
@@ -3107,7 +3117,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           boolean flushSuccess = false;
           boolean success = false;
           try {
-            seqNo = docWriter.flushAllThreads(this);
+            seqNo = docWriter.flushAllThreads();
             if (seqNo < 0) {
               anyChanges = true;
               seqNo = -seqNo;
@@ -3117,7 +3127,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
               // if we flushed anything.
               flushCount.incrementAndGet();
             }
-
+            publishFlushedSegments(true);
             // cannot pass triggerMerges=true here else it can lead to deadlock:
             processEvents(false);
             
@@ -3171,8 +3181,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
                 infoStream.message("IW", "hit exception during prepareCommit");
               }
             }
+            assert holdsFullFlushLock();
             // Done: finish the full flush!
-            docWriter.finishFullFlush(this, flushSuccess);
+            docWriter.finishFullFlush(flushSuccess);
             doAfterFlush();
           }
         }
@@ -3468,7 +3479,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       synchronized (fullFlushLock) {
         boolean flushSuccess = false;
         try {
-          long seqNo = docWriter.flushAllThreads(this);
+          long seqNo = docWriter.flushAllThreads();
           if (seqNo < 0) {
             seqNo = -seqNo;
             anyChanges = true;
@@ -3479,9 +3490,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             // flushCount is incremented in flushAllThreads
             flushCount.incrementAndGet();
           }
+          publishFlushedSegments(true);
           flushSuccess = true;
         } finally {
-          docWriter.finishFullFlush(this, flushSuccess);
+          assert holdsFullFlushLock();
+          docWriter.finishFullFlush(flushSuccess);
           processEvents(false);
         }
       }
@@ -4850,8 +4863,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
   }
 
-  private int purge(boolean forced) throws IOException {
-    return docWriter.purgeBuffer(this, forced);
+  /**
+   * Publishes the flushed segment, segment-private deletes (if any) and its
+   * associated global delete (if present) to IndexWriter.  The actual
+   * publishing operation is synced on {@code IW -> BDS} so that the {@link SegmentInfo}'s
+   * delete generation is always GlobalPacket_deleteGeneration + 1
+   * @param forced if <code>true</code> this call will block on the ticket queue if the lock is held by another thread.
+   *               if <code>false</code> the call will try to acquire the queue lock and exits if it's held by another thread.
+   *
+   */
+  void publishFlushedSegments(boolean forced) throws IOException {
+    docWriter.purgeFlushTickets(forced, ticket -> {
+      DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
+      FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
+      ticket.markPublished();
+      if (newSegment == null) { // this is a flushed global deletes package - not a segments
+        if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
+          publishFrozenUpdates(bufferedUpdates);
+          if (infoStream.isEnabled("IW")) {
+            infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
+          }
+        }
+      } else {
+        assert newSegment.segmentInfo != null;
+        if (infoStream.isEnabled("IW")) {
+          infoStream.message("IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
+        }
+        if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
+          infoStream.message("IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
+        }
+        // now publish!
+        publishFlushedSegment(newSegment.segmentInfo, newSegment.fieldInfos, newSegment.segmentUpdates,
+            bufferedUpdates, newSegment.sortMap);
+      }
+    });
   }
 
   /** Record that the files referenced by this {@link SegmentInfos} are still in use.
@@ -5000,30 +5045,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     return readerPool.get(info, create);
   }
 
-  private static final class ResolveUpdatesEvent implements Event {
-
-    private final FrozenBufferedUpdates packet;
-
-    ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
-      this.packet = packet;
-    }
-
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      try {
-        packet.apply(writer);
-      } catch (Throwable t) {
-        try {
-          writer.onTragicEvent(t, "applyUpdatesPacket");
-        } catch (Throwable t1) {
-          t.addSuppressed(t1);
-        }
-        throw t;
-      }
-      writer.flushDeletesCount.incrementAndGet();
-    }
-  }
-
   void finished(FrozenBufferedUpdates packet) {
     bufferedUpdatesStream.finished(packet);
   }