You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/23 17:35:41 UTC

[37/40] lucene-solr:jira/solr-11833: LUCENE-8269: Detach downstream classes from IndexWriter

LUCENE-8269: Detach downstream classes from IndexWriter

IndexWriter today is shared with many classes like BufferedUpdateStream,
DocumentsWriter and DocumentsWriterPerThread. Some of them even acquire locks
on the writer instance or assert that the current thread doesn't hold a lock.
This makes it very difficult to have a manageable threading model.

This change separates out the IndexWriter from those classes and makes them all
independent of IW. IW now implements a new interface for DocumentsWriter to communicate
on failed or successful flushes and tragic events. This allows IW to make it's critical
methods private and execute all lock critical actions on it's private queue that ensures
that the IW lock is not held. Follow-up changes will try to detach more code like
publishing flushed segments to ensure we never call back into IW in an uncontrolled way.

Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/6f0a8845
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/6f0a8845
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/6f0a8845

Branch: refs/heads/jira/solr-11833
Commit: 6f0a884582a3d58342f98dc1df2c06418defb317
Parents: e8c36f4
Author: Simon Willnauer <si...@apache.org>
Authored: Mon Apr 23 17:17:40 2018 +0200
Committer: GitHub <no...@github.com>
Committed: Mon Apr 23 17:17:40 2018 +0200

----------------------------------------------------------------------
 .../lucene/index/BufferedUpdatesStream.java     | 133 ++++----------
 .../apache/lucene/index/DocumentsWriter.java    | 181 ++++++++-----------
 .../index/DocumentsWriterFlushControl.java      |   4 +-
 .../lucene/index/DocumentsWriterPerThread.java  |  32 ++--
 .../lucene/index/FrozenBufferedUpdates.java     |  72 ++++++--
 .../org/apache/lucene/index/IndexWriter.java    | 157 +++++++++++-----
 .../lucene/index/TestIndexWriterExceptions.java |   9 +-
 .../org/apache/lucene/index/TestInfoStream.java |   8 +-
 .../apache/lucene/index/RandomIndexWriter.java  |   8 +-
 9 files changed, 315 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
index 7a93cfd..c93e4b6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
@@ -17,9 +17,8 @@
 
 package org.apache.lucene.index;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -48,7 +47,7 @@ import org.apache.lucene.util.InfoStream;
  * track which BufferedDeletes packets to apply to any given
  * segment. */
 
-class BufferedUpdatesStream implements Accountable {
+final class BufferedUpdatesStream implements Accountable {
 
   private final Set<FrozenBufferedUpdates> updates = new HashSet<>();
 
@@ -56,22 +55,19 @@ class BufferedUpdatesStream implements Accountable {
   // deletes applied (whose bufferedDelGen defaults to 0)
   // will be correct:
   private long nextGen = 1;
-
   private final FinishedSegments finishedSegments;
   private final InfoStream infoStream;
   private final AtomicLong bytesUsed = new AtomicLong();
   private final AtomicInteger numTerms = new AtomicInteger();
-  private final IndexWriter writer;
 
-  public BufferedUpdatesStream(IndexWriter writer) {
-    this.writer = writer;
-    this.infoStream = writer.infoStream;
+  BufferedUpdatesStream(InfoStream infoStream) {
+    this.infoStream = infoStream;
     this.finishedSegments = new FinishedSegments(infoStream);
   }
 
   // Appends a new packet of buffered deletes to the stream,
   // setting its generation:
-  public synchronized long push(FrozenBufferedUpdates packet) {
+  synchronized long push(FrozenBufferedUpdates packet) {
     /*
      * The insert operation must be atomic. If we let threads increment the gen
      * and push the packet afterwards we risk that packets are out of order.
@@ -94,12 +90,12 @@ class BufferedUpdatesStream implements Accountable {
     return packet.delGen();
   }
 
-  public synchronized int getPendingUpdatesCount() {
+  synchronized int getPendingUpdatesCount() {
     return updates.size();
   }
 
   /** Only used by IW.rollback */
-  public synchronized void clear() {
+  synchronized void clear() {
     updates.clear();
     nextGen = 1;
     finishedSegments.clear();
@@ -107,11 +103,11 @@ class BufferedUpdatesStream implements Accountable {
     bytesUsed.set(0);
   }
 
-  public boolean any() {
+  boolean any() {
     return bytesUsed.get() != 0;
   }
 
-  public int numTerms() {
+  int numTerms() {
     return numTerms.get();
   }
 
@@ -120,13 +116,13 @@ class BufferedUpdatesStream implements Accountable {
     return bytesUsed.get();
   }
 
-  public static class ApplyDeletesResult {
+  static class ApplyDeletesResult {
     
     // True if any actual deletes took place:
-    public final boolean anyDeletes;
+    final boolean anyDeletes;
 
     // If non-null, contains segments that are 100% deleted
-    public final List<SegmentCommitInfo> allDeleted;
+    final List<SegmentCommitInfo> allDeleted;
 
     ApplyDeletesResult(boolean anyDeletes, List<SegmentCommitInfo> allDeleted) {
       this.anyDeletes = anyDeletes;
@@ -137,26 +133,22 @@ class BufferedUpdatesStream implements Accountable {
   /** Waits for all in-flight packets, which are already being resolved concurrently
    *  by indexing threads, to finish.  Returns true if there were any 
    *  new deletes or updates.  This is called for refresh, commit. */
-  public void waitApplyAll() throws IOException {
-
+  void waitApplyAll(IndexWriter writer) throws IOException {
     assert Thread.holdsLock(writer) == false;
-    
-    final long t0 = System.nanoTime();
-
     Set<FrozenBufferedUpdates> waitFor;
     synchronized (this) {
       waitFor = new HashSet<>(updates);
     }
 
-    waitApply(waitFor);
+    waitApply(waitFor, writer);
   }
 
   /** Returns true if this delGen is still running. */
-  public boolean stillRunning(long delGen) {
+  boolean stillRunning(long delGen) {
     return finishedSegments.stillRunning(delGen);
   }
 
-  public void finishedSegment(long delGen) {
+  void finishedSegment(long delGen) {
     finishedSegments.finishedSegment(delGen);
   }
   
@@ -164,7 +156,7 @@ class BufferedUpdatesStream implements Accountable {
    *  delGen.  We track the completed delGens and record the maximum delGen for which all prior
    *  delGens, inclusive, are completed, so that it's safe for doc values updates to apply and write. */
 
-  public synchronized void finished(FrozenBufferedUpdates packet) {
+  synchronized void finished(FrozenBufferedUpdates packet) {
     // TODO: would be a bit more memory efficient to track this per-segment, so when each segment writes it writes all packets finished for
     // it, rather than only recording here, across all segments.  But, more complex code, and more CPU, and maybe not so much impact in
     // practice?
@@ -182,18 +174,14 @@ class BufferedUpdatesStream implements Accountable {
   }
 
   /** All frozen packets up to and including this del gen are guaranteed to be finished. */
-  public long getCompletedDelGen() {
+  long getCompletedDelGen() {
     return finishedSegments.getCompletedDelGen();
   }   
 
   /** Waits only for those in-flight packets that apply to these merge segments.  This is
    *  called when a merge needs to finish and must ensure all deletes to the merging
    *  segments are resolved. */
-  public void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos) throws IOException {
-    assert Thread.holdsLock(writer) == false;
-
-    final long t0 = System.nanoTime();
-
+  void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos, IndexWriter writer) throws IOException {
     long maxDelGen = Long.MIN_VALUE;
     for (SegmentCommitInfo info : mergeInfos) {
       maxDelGen = Math.max(maxDelGen, info.getBufferedDeletesGen());
@@ -214,10 +202,10 @@ class BufferedUpdatesStream implements Accountable {
       infoStream.message("BD", "waitApplyForMerge: " + waitFor.size() + " packets, " + mergeInfos.size() + " merging segments");
     }
     
-    waitApply(waitFor);
+    waitApply(waitFor, writer);
   }
 
-  private void waitApply(Set<FrozenBufferedUpdates> waitFor) throws IOException {
+  private void waitApply(Set<FrozenBufferedUpdates> waitFor, IndexWriter writer) throws IOException {
 
     long startNS = System.nanoTime();
 
@@ -258,87 +246,34 @@ class BufferedUpdatesStream implements Accountable {
   }
 
   /** Holds all per-segment internal state used while resolving deletions. */
-  static final class SegmentState {
+  static final class SegmentState implements Closeable {
     final long delGen;
     final ReadersAndUpdates rld;
     final SegmentReader reader;
     final int startDelCount;
+    private final IOUtils.IOConsumer<ReadersAndUpdates> onClose;
 
     TermsEnum termsEnum;
     PostingsEnum postingsEnum;
     BytesRef term;
 
-    SegmentState(ReadersAndUpdates rld, SegmentCommitInfo info) throws IOException {
+    SegmentState(ReadersAndUpdates rld, IOUtils.IOConsumer<ReadersAndUpdates> onClose, SegmentCommitInfo info) throws IOException {
       this.rld = rld;
       startDelCount = rld.getPendingDeleteCount();
-      reader = rld.getReader(IOContext.READ);
       delGen = info.getBufferedDeletesGen();
+      this.onClose = onClose;
+      reader = rld.getReader(IOContext.READ);
     }
 
     @Override
     public String toString() {
       return "SegmentState(" + rld.info + ")";
     }
-  }
-
-  /** Opens SegmentReader and inits SegmentState for each segment. */
-  public SegmentState[] openSegmentStates(List<SegmentCommitInfo> infos,
-                                          Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
-    List<SegmentState> segStates = new ArrayList<>();
-    try {
-      for (SegmentCommitInfo info : infos) {
-        if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
-          segStates.add(new SegmentState(writer.getPooledInstance(info, true), info));
-          alreadySeenSegments.add(info);
-        }
-      }
-    } catch (Throwable t) {
-      try {
-        finishSegmentStates(segStates);
-      } catch (Throwable t1) {
-        t.addSuppressed(t1);
-      }
-      throw t;
-    }
-    
-    return segStates.toArray(new SegmentState[0]);
-  }
-
-  private void finishSegmentStates(List<SegmentState> segStates) throws IOException {
-    IOUtils.applyToAll(segStates, s -> {
-      ReadersAndUpdates rld = s.rld;
-      try {
-        rld.release(s.reader);
-      } finally {
-        writer.release(s.rld);
-      }
-    });
-  }
 
-  /** Close segment states previously opened with openSegmentStates. */
-  public ApplyDeletesResult closeSegmentStates(SegmentState[] segStates, boolean success) throws IOException {
-    List<SegmentCommitInfo> allDeleted = null;
-    long totDelCount = 0;
-    final List<SegmentState> segmentStates = Arrays.asList(segStates);
-    for (SegmentState segState : segmentStates) {
-      if (success) {
-        totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
-        int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
-        assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
-        if (segState.rld.isFullyDeleted() && writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> segState.reader) == false) {
-          if (allDeleted == null) {
-            allDeleted = new ArrayList<>();
-          }
-          allDeleted.add(segState.reader.getSegmentInfo());
-        }
-      }
-    }
-    finishSegmentStates(segmentStates);
-    if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + updates.size() + " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
+    @Override
+    public void close() throws IOException {
+      IOUtils.close(() -> rld.release(reader), () -> onClose.accept(rld));
     }
-
-    return new ApplyDeletesResult(totDelCount > 0, allDeleted);      
   }
 
   // only for assert
@@ -368,24 +303,24 @@ class BufferedUpdatesStream implements Accountable {
 
     private final InfoStream infoStream;
 
-    public FinishedSegments(InfoStream infoStream) {
+    FinishedSegments(InfoStream infoStream) {
       this.infoStream = infoStream;
     }
 
-    public synchronized void clear() {
+    synchronized void clear() {
       finishedDelGens.clear();
       completedDelGen = 0;
     }
 
-    public synchronized boolean stillRunning(long delGen) {
+    synchronized boolean stillRunning(long delGen) {
       return delGen > completedDelGen && finishedDelGens.contains(delGen) == false;
     }
 
-    public synchronized long getCompletedDelGen() {
+    synchronized long getCompletedDelGen() {
       return completedDelGen;
     }
 
-    public synchronized void finishedSegment(long delGen) {
+    synchronized void finishedSegment(long delGen) {
       finishedDelGens.add(delGen);
       while (true) {
         if (finishedDelGens.contains(completedDelGen + 1)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 0042dab..5e7cdce 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -23,17 +23,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.index.IndexWriter.Event;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
@@ -101,6 +101,12 @@ import org.apache.lucene.util.InfoStream;
 final class DocumentsWriter implements Closeable, Accountable {
   private final Directory directoryOrig; // no wrapping, for infos
   private final Directory directory;
+  private final FieldInfos.FieldNumbers globalFieldNumberMap;
+  private final int indexCreatedVersionMajor;
+  private final AtomicLong pendingNumDocs;
+  private final boolean enableTestPoints;
+  private final Supplier<String> segmentNameSupplier;
+  private final FlushNotifications flushNotifications;
 
   private volatile boolean closed;
 
@@ -124,11 +130,12 @@ final class DocumentsWriter implements Closeable, Accountable {
   final DocumentsWriterPerThreadPool perThreadPool;
   final FlushPolicy flushPolicy;
   final DocumentsWriterFlushControl flushControl;
-  private final IndexWriter writer;
-  private final Queue<Event> events;
   private long lastSeqNo;
   
-  DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) {
+  DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
+                  Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory,
+                  FieldInfos.FieldNumbers globalFieldNumberMap) {
+    this.indexCreatedVersionMajor = indexCreatedVersionMajor;
     this.directoryOrig = directoryOrig;
     this.directory = directory;
     this.config = config;
@@ -136,9 +143,12 @@ final class DocumentsWriter implements Closeable, Accountable {
     this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
     this.perThreadPool = config.getIndexerThreadPool();
     flushPolicy = config.getFlushPolicy();
-    this.writer = writer;
-    this.events = new ConcurrentLinkedQueue<>();
-    flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream);
+    this.globalFieldNumberMap = globalFieldNumberMap;
+    this.pendingNumDocs = pendingNumDocs;
+    flushControl = new DocumentsWriterFlushControl(this, config);
+    this.segmentNameSupplier = segmentNameSupplier;
+    this.enableTestPoints = enableTestPoints;
+    this.flushNotifications = flushNotifications;
   }
   
   long deleteQueries(final Query... queries) throws IOException {
@@ -175,7 +185,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       if (deleteQueue != null) {
         ticketQueue.addDeletes(deleteQueue);
       }
-      putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
+      flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
       return true;
     }
     return false;
@@ -409,10 +419,10 @@ final class DocumentsWriter implements Closeable, Accountable {
   
   private void ensureInitialized(ThreadState state) throws IOException {
     if (state.dwpt == null) {
-      final FieldInfos.Builder infos = new FieldInfos.Builder(writer.globalFieldNumberMap);
-      state.dwpt = new DocumentsWriterPerThread(writer, writer.newSegmentName(), directoryOrig,
+      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
+      state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
                                                 directory, config, infoStream, deleteQueue, infos,
-                                                writer.pendingNumDocs, writer.enableTestPoints);
+                                                pendingNumDocs, enableTestPoints);
     }
   }
 
@@ -433,7 +443,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       final DocumentsWriterPerThread dwpt = perThread.dwpt;
       final int dwptNumDocs = dwpt.getNumDocsInRAM();
       try {
-        seqNo = dwpt.updateDocuments(docs, analyzer, delNode);
+        seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications);
       } finally {
         if (dwpt.isAborted()) {
           flushControl.doOnAbort(perThread);
@@ -460,7 +470,7 @@ final class DocumentsWriter implements Closeable, Accountable {
   }
 
   long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
-      final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
+                      final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
 
     boolean hasEvents = preUpdate();
 
@@ -477,7 +487,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       final DocumentsWriterPerThread dwpt = perThread.dwpt;
       final int dwptNumDocs = dwpt.getNumDocsInRAM();
       try {
-        seqNo = dwpt.updateDocument(doc, analyzer, delNode);
+        seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
       } finally {
         if (dwpt.isAborted()) {
           flushControl.doOnAbort(perThread);
@@ -536,17 +546,18 @@ final class DocumentsWriter implements Closeable, Accountable {
           boolean dwptSuccess = false;
           try {
             // flush concurrently without locking
-            final FlushedSegment newSegment = flushingDWPT.flush();
+            final FlushedSegment newSegment = flushingDWPT.flush(flushNotifications);
             ticketQueue.addSegment(ticket, newSegment);
             dwptSuccess = true;
           } finally {
             subtractFlushedNumDocs(flushingDocsInRam);
             if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
-              putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
+              Set<String> files = flushingDWPT.pendingFilesToDelete();
+              flushNotifications.deleteUnusedFiles(files);
               hasEvents = true;
             }
             if (dwptSuccess == false) {
-              putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
+              flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
               hasEvents = true;
             }
           }
@@ -569,7 +580,7 @@ final class DocumentsWriter implements Closeable, Accountable {
           // thread in innerPurge can't keep up with all
           // other threads flushing segments.  In this case
           // we forcefully stall the producers.
-          putEvent(ForcedPurgeEvent.INSTANCE);
+          flushNotifications.onTicketBacklog();
           break;
         }
       } finally {
@@ -580,7 +591,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
 
     if (hasEvents) {
-      writer.doAfterSegmentFlushed(false, false);
+      flushNotifications.afterSegmentsFlushed();
     }
 
     // If deletes alone are consuming > 1/2 our RAM
@@ -597,12 +608,52 @@ final class DocumentsWriter implements Closeable, Accountable {
                                                  flushControl.getDeleteBytesUsed()/(1024.*1024.),
                                                  ramBufferSizeMB));
         }
-        putEvent(ApplyDeletesEvent.INSTANCE);
+        flushNotifications.onDeletesApplied();
       }
     }
 
     return hasEvents;
   }
+
+  interface FlushNotifications { // TODO maybe we find a better name for this?
+
+    /**
+     * Called when files were written to disk that are not used anymore. It's the implementations responsibilty
+     * to clean these files up
+     */
+    void deleteUnusedFiles(Collection<String> files);
+
+    /**
+     * Called when a segment failed to flush.
+     */
+    void flushFailed(SegmentInfo info);
+
+    /**
+     * Called after one or more segments were flushed to disk.
+     */
+    void afterSegmentsFlushed() throws IOException;
+
+    /**
+     * Should be called if a flush or an indexing operation caused a tragic / unrecoverable event.
+     */
+    void onTragicEvent(Throwable event, String message);
+
+    /**
+     * Called once deletes have been applied either after a flush or on a deletes call
+     */
+    void onDeletesApplied();
+
+    /**
+     * Called once the DocumentsWriter ticket queue has a backlog. This means there is an inner thread
+     * that tries to publish flushed segments but can't keep up with the other threads flushing new segments.
+     * This likely requires other thread to forcefully purge the buffer to help publishing. This
+     * can't be done in-place since we might hold index writer locks when this is called. The caller must ensure
+     * that the purge happens without an index writer lock hold
+     *
+     * @see DocumentsWriter#purgeBuffer(IndexWriter, boolean)
+     */
+    void onTicketBacklog();
+  }
   
   void subtractFlushedNumDocs(int numFlushed) {
     int oldValue = numDocsInRAM.get();
@@ -626,7 +677,7 @@ final class DocumentsWriter implements Closeable, Accountable {
    * two stage operation; the caller must ensure (in try/finally) that finishFlush
    * is called after this method, to release the flush lock in DWFlushControl
    */
-  long flushAllThreads()
+  long flushAllThreads(IndexWriter writer)
     throws IOException {
     final DocumentsWriterDeleteQueue flushingDeleteQueue;
     if (infoStream.isEnabled("DW")) {
@@ -695,92 +746,8 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
   }
 
-  void putEvent(Event event) {
-    events.add(event);
-  }
-
   @Override
   public long ramBytesUsed() {
     return flushControl.ramBytesUsed();
   }
-
-  static final class ResolveUpdatesEvent implements Event {
-
-    private final FrozenBufferedUpdates packet;
-    
-    ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
-      this.packet = packet;
-    }
-
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      try {
-        packet.apply(writer);
-      } catch (Throwable t) {
-        try {
-          writer.onTragicEvent(t, "applyUpdatesPacket");
-        } catch (Throwable t1) {
-          t.addSuppressed(t1);
-        }
-        throw t;
-      }
-      writer.flushDeletesCount.incrementAndGet();
-    }
-  }
-
-  static final class ApplyDeletesEvent implements Event {
-    static final Event INSTANCE = new ApplyDeletesEvent();
-
-    private ApplyDeletesEvent() {
-      // only one instance
-    }
-    
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      writer.applyDeletesAndPurge(true); // we always purge!
-    }
-  }
-
-  static final class ForcedPurgeEvent implements Event {
-    static final Event INSTANCE = new ForcedPurgeEvent();
-
-    private ForcedPurgeEvent() {
-      // only one instance
-    }
-    
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      writer.purge(true);
-    }
-  }
-  
-  static class FlushFailedEvent implements Event {
-    private final SegmentInfo info;
-    
-    public FlushFailedEvent(SegmentInfo info) {
-      this.info = info;
-    }
-    
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      writer.flushFailed(info);
-    }
-  }
-  
-  static class DeleteNewFilesEvent implements Event {
-    private final Collection<String>  files;
-    
-    public DeleteNewFilesEvent(Collection<String>  files) {
-      this.files = files;
-    }
-    
-    @Override
-    public void process(IndexWriter writer) throws IOException {
-      writer.deleteNewFiles(files);
-    }
-  }
-
-  public Queue<Event> eventQueue() {
-    return events;
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 8aea232..ad5b7e4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -70,10 +70,9 @@ final class DocumentsWriterFlushControl implements Accountable {
   private boolean closed = false;
   private final DocumentsWriter documentsWriter;
   private final LiveIndexWriterConfig config;
-  private final BufferedUpdatesStream bufferedUpdatesStream;
   private final InfoStream infoStream;
 
-  DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedUpdatesStream bufferedUpdatesStream) {
+  DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
     this.infoStream = config.getInfoStream();
     this.stallControl = new DocumentsWriterStallControl();
     this.perThreadPool = documentsWriter.perThreadPool;
@@ -81,7 +80,6 @@ final class DocumentsWriterFlushControl implements Accountable {
     this.config = config;
     this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
     this.documentsWriter = documentsWriter;
-    this.bufferedUpdatesStream = bufferedUpdatesStream;
   }
 
   public synchronized long activeBytes() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 32a783a..04ab493 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -169,11 +169,10 @@ class DocumentsWriterPerThread {
   private final AtomicLong pendingNumDocs;
   private final LiveIndexWriterConfig indexWriterConfig;
   private final boolean enableTestPoints;
-  private final IndexWriter indexWriter;
-  
-  public DocumentsWriterPerThread(IndexWriter writer, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
+  private final int indexVersionCreated;
+
+  public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
                                   FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
-    this.indexWriter = writer;
     this.directoryOrig = directoryOrig;
     this.directory = new TrackingDirectoryWrapper(directory);
     this.fieldInfos = fieldInfos;
@@ -200,6 +199,7 @@ class DocumentsWriterPerThread {
     // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
     consumer = indexWriterConfig.getIndexingChain().getChain(this);
     this.enableTestPoints = enableTestPoints;
+    this.indexVersionCreated = indexVersionCreated;
   }
   
   public FieldInfos.Builder getFieldInfosBuilder() {
@@ -207,7 +207,7 @@ class DocumentsWriterPerThread {
   }
 
   public int getIndexCreatedVersionMajor() {
-    return indexWriter.segmentInfos.getIndexCreatedVersionMajor();
+    return indexVersionCreated;
   }
 
   final void testPoint(String message) {
@@ -227,7 +227,7 @@ class DocumentsWriterPerThread {
     }
   }
 
-  public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException {
+  public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
     try {
       assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
       testPoint("DocumentsWriterPerThread addDocument start");
@@ -263,11 +263,11 @@ class DocumentsWriterPerThread {
 
       return finishDocument(deleteNode);
     } finally {
-      maybeAbort("updateDocument");
+      maybeAbort("updateDocument", flushNotifications);
     }
   }
 
-  public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException {
+  public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
     try {
       testPoint("DocumentsWriterPerThread addDocuments start");
       assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
@@ -343,7 +343,7 @@ class DocumentsWriterPerThread {
         docState.clear();
       }
     } finally {
-      maybeAbort("updateDocuments");
+      maybeAbort("updateDocuments", flushNotifications);
     }
   }
   
@@ -425,7 +425,7 @@ class DocumentsWriterPerThread {
   }
 
   /** Flush all pending docs to a new segment */
-  FlushedSegment flush() throws IOException {
+  FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
     assert numDocsInRAM > 0;
     assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
     segmentInfo.setMaxDoc(numDocsInRAM);
@@ -499,7 +499,7 @@ class DocumentsWriterPerThread {
       FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
           segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush,
           sortMap);
-      sealFlushedSegment(fs, sortMap);
+      sealFlushedSegment(fs, sortMap, flushNotifications);
       if (infoStream.isEnabled("DWPT")) {
         infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
       }
@@ -508,18 +508,18 @@ class DocumentsWriterPerThread {
       onAbortingException(t);
       throw t;
     } finally {
-      maybeAbort("flush");
+      maybeAbort("flush", flushNotifications);
     }
   }
 
-  private void maybeAbort(String location) throws IOException {
+  private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
     if (hasHitAbortingException() && aborted == false) {
       // if we are already aborted don't do anything here
       try {
         abort();
       } finally {
         // whatever we do here we have to fire this tragic event up.
-        indexWriter.onTragicEvent(abortingException, location);
+        flushNotifications.onTragicEvent(abortingException, location);
       }
     }
   }
@@ -545,7 +545,7 @@ class DocumentsWriterPerThread {
    * Seals the {@link SegmentInfo} for the new flushed segment and persists
    * the deleted documents {@link MutableBits}.
    */
-  void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap) throws IOException {
+  void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
     assert flushedSegment != null;
     SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
 
@@ -559,7 +559,7 @@ class DocumentsWriterPerThread {
       if (indexWriterConfig.getUseCompoundFile()) {
         Set<String> originalFiles = newSegment.info.files();
         // TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup...
-        indexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context);
+        IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);
         filesToDelete.addAll(originalFiles);
         newSegment.info.setUseCompoundFile(true);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
index 586afa7..bebc059 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
@@ -18,6 +18,8 @@ package org.apache.lucene.index;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +44,7 @@ import org.apache.lucene.store.RAMOutputStream;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.RamUsageEstimator;
 
@@ -51,7 +54,7 @@ import org.apache.lucene.util.RamUsageEstimator;
  * structure to hold them.  We don't hold docIDs because these are applied on
  * flush.
  */
-class FrozenBufferedUpdates {
+final class FrozenBufferedUpdates {
 
   /* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs
    * in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important
@@ -297,7 +300,7 @@ class FrozenBufferedUpdates {
 
         // Must open while holding IW lock so that e.g. segments are not merged
         // away, dropped from 100% deletions, etc., before we can open the readers
-        segStates = writer.bufferedUpdatesStream.openSegmentStates(infos, seenSegments, delGen());
+        segStates = openSegmentStates(writer, infos, seenSegments, delGen());
 
         if (segStates.length == 0) {
 
@@ -357,7 +360,7 @@ class FrozenBufferedUpdates {
           // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
           
           // Record that this packet is finished:
-          writer.bufferedUpdatesStream.finished(this);
+          writer.finished(this);
 
           finished = true;
 
@@ -378,7 +381,7 @@ class FrozenBufferedUpdates {
 
     if (finished == false) {
       // Record that this packet is finished:
-      writer.bufferedUpdatesStream.finished(this);
+      writer.finished(this);
     }
         
     if (infoStream.isEnabled("BD")) {
@@ -388,18 +391,67 @@ class FrozenBufferedUpdates {
       if (iter > 0) {
         message += "; " + (iter+1) + " iters due to concurrent merges";
       }
-      message += "; " + writer.bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain";
+      message += "; " + writer.getPendingUpdatesCount() + " packets remain";
       infoStream.message("BD", message);
     }
   }
 
+  /** Opens SegmentReader and inits SegmentState for each segment. */
+  private static BufferedUpdatesStream.SegmentState[] openSegmentStates(IndexWriter writer, List<SegmentCommitInfo> infos,
+                                                                       Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
+    List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
+    try {
+      for (SegmentCommitInfo info : infos) {
+        if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
+          segStates.add(new BufferedUpdatesStream.SegmentState(writer.getPooledInstance(info, true), writer::release, info));
+          alreadySeenSegments.add(info);
+        }
+      }
+    } catch (Throwable t) {
+      try {
+        IOUtils.close(segStates);
+      } catch (Throwable t1) {
+        t.addSuppressed(t1);
+      }
+      throw t;
+    }
+
+    return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
+  }
+
+  /** Close segment states previously opened with openSegmentStates. */
+  public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
+    List<SegmentCommitInfo> allDeleted = null;
+    long totDelCount = 0;
+    final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
+    for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
+      if (success) {
+        totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
+        int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
+        assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
+        if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
+          if (allDeleted == null) {
+            allDeleted = new ArrayList<>();
+          }
+          allDeleted.add(segState.reader.getSegmentInfo());
+        }
+      }
+    }
+    IOUtils.close(segmentStates);
+    if (writer.infoStream.isEnabled("BD")) {
+      writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
+    }
+
+    return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
+  }
+
   private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
                            boolean success, Set<String> delFiles) throws IOException {
     synchronized (writer) {
 
       BufferedUpdatesStream.ApplyDeletesResult result;
       try {
-        result = writer.bufferedUpdatesStream.closeSegmentStates(segStates, success);
+        result = closeSegmentStates(writer, segStates, success);
       } finally {
         // Matches the incRef we did above, but we must do the decRef after closing segment states else
         // IFD can't delete still-open files
@@ -407,8 +459,8 @@ class FrozenBufferedUpdates {
       }
 
       if (result.anyDeletes) {
-        writer.maybeMerge.set(true);
-        writer.checkpoint();
+          writer.maybeMerge.set(true);
+          writer.checkpoint();
       }
 
       if (result.allDeleted != null) {
@@ -857,8 +909,4 @@ class FrozenBufferedUpdates {
   boolean any() {
     return deleteTerms.size() > 0 || deleteQueries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0;
   }
-
-  boolean anyDeleteTerms() {
-    return deleteTerms.size() > 0;
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 974f6c5..e8d0666 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -236,7 +237,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   }
   
   /** Used only for testing. */
-  boolean enableTestPoints = false;
+  private final boolean enableTestPoints;
 
   static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
   
@@ -291,7 +292,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   final FieldNumbers globalFieldNumberMap;
 
   final DocumentsWriter docWriter;
-  private final Queue<Event> eventQueue;
+  private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<>();
   final IndexFileDeleter deleter;
 
   // used by forceMerge to note those needing merging
@@ -345,6 +346,51 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    *  card to make sure they can later charge you when you check out. */
   final AtomicLong pendingNumDocs = new AtomicLong();
 
+  private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
+    @Override
+    public void deleteUnusedFiles(Collection<String> files) {
+      eventQueue.add(w -> w.deleteNewFiles(files));
+    }
+
+    @Override
+    public void flushFailed(SegmentInfo info) {
+      eventQueue.add(w -> w.flushFailed(info));
+    }
+
+    @Override
+    public void afterSegmentsFlushed() throws IOException {
+      try {
+        purge(false);
+      } finally {
+        if (false) {
+          maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+        }
+      }
+    }
+
+    @Override
+    public void onTragicEvent(Throwable event, String message) {
+      IndexWriter.this.onTragicEvent(event, message);
+    }
+
+    @Override
+    public void onDeletesApplied() {
+      eventQueue.add(w -> {
+          try {
+            w.purge(true);
+          } finally {
+            flushCount.incrementAndGet();
+          }
+        }
+      );
+    }
+
+    @Override
+    public void onTicketBacklog() {
+      eventQueue.add(w -> w.purge(true));
+    }
+  };
+
   DirectoryReader getReader() throws IOException {
     return getReader(true, false);
   }
@@ -439,7 +485,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       synchronized (fullFlushLock) {
         try {
           // TODO: should we somehow make this available in the returned NRT reader?
-          long seqNo = docWriter.flushAllThreads();
+          long seqNo = docWriter.flushAllThreads(this);
           if (seqNo < 0) {
             anyChanges = true;
             seqNo = -seqNo;
@@ -660,7 +706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     if (d instanceof FSDirectory && ((FSDirectory) d).checkPendingDeletions()) {
       throw new IllegalArgumentException("Directory " + d + " still has pending deleted files; cannot initialize IndexWriter");
     }
-
+    enableTestPoints = isEnableTestPoints();
     conf.setIndexWriter(this); // prevent reuse by other instances
     config = conf;
     infoStream = config.getInfoStream();
@@ -678,9 +724,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       mergeScheduler = config.getMergeScheduler();
       mergeScheduler.setInfoStream(infoStream);
       codec = config.getCodec();
-
-      bufferedUpdatesStream = new BufferedUpdatesStream(this);
-
       OpenMode mode = config.getOpenMode();
       boolean create;
       if (mode == OpenMode.CREATE) {
@@ -824,8 +867,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       validateIndexSort();
 
       config.getFlushPolicy().init(config);
-      docWriter = new DocumentsWriter(this, config, directoryOrig, directory);
-      eventQueue = docWriter.eventQueue();
+      bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
+      docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
+          enableTestPoints, this::newSegmentName,
+          config, directoryOrig, directory, globalFieldNumberMap);
       readerPool = new ReaderPool(directory, directoryOrig, segmentInfos, globalFieldNumberMap,
           bufferedUpdatesStream::getCompletedDelGen, infoStream, conf.getSoftDeletesField(), reader);
       if (config.getReaderPooling()) {
@@ -2457,7 +2502,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
     assert packet != null && packet.any();
     bufferedUpdatesStream.push(packet);
-    docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
+    eventQueue.add(new ResolveUpdatesEvent(packet));
   }
 
   /**
@@ -2479,7 +2524,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       if (globalPacket != null && globalPacket.any()) {
         // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
         bufferedUpdatesStream.push(globalPacket);
-        docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(globalPacket));
+        eventQueue.add(new ResolveUpdatesEvent(globalPacket));
       }
 
       // Publishing the segment must be sync'd on IW -> BDS to make the sure
@@ -2489,7 +2534,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         nextGen = bufferedUpdatesStream.push(packet);
 
         // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
-        docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
+        eventQueue.add(new ResolveUpdatesEvent(packet));
 
       } else {
         // Since we don't have a delete packet to apply we can get a new
@@ -2877,7 +2922,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         // TODO: unlike merge, on exception we arent sniping any trash cfs files here?
         // createCompoundFile tries to cleanup, but it might not always be able to...
         try {
-          createCompoundFile(infoStream, trackingCFSDir, info, context);
+          createCompoundFile(infoStream, trackingCFSDir, info, context, this::deleteNewFiles);
         } finally {
           // delete new non cfs files directly: they were never
           // registered with IFD
@@ -3060,7 +3105,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           boolean flushSuccess = false;
           boolean success = false;
           try {
-            seqNo = docWriter.flushAllThreads();
+            seqNo = docWriter.flushAllThreads(this);
             if (seqNo < 0) {
               anyChanges = true;
               seqNo = -seqNo;
@@ -3421,7 +3466,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       synchronized (fullFlushLock) {
         boolean flushSuccess = false;
         try {
-          long seqNo = docWriter.flushAllThreads();
+          long seqNo = docWriter.flushAllThreads(this);
           if (seqNo < 0) {
             seqNo = -seqNo;
             anyChanges = true;
@@ -3469,7 +3514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     if (infoStream.isEnabled("IW")) {
       infoStream.message("IW", "now apply all deletes for all segments buffered updates bytesUsed=" + bufferedUpdatesStream.ramBytesUsed() + " reader pool bytesUsed=" + readerPool.ramBytesUsed());
     }
-    bufferedUpdatesStream.waitApplyAll();
+    bufferedUpdatesStream.waitApplyAll(this);
   }
 
   // for testing only
@@ -3998,9 +4043,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   /** Does initial setup for a merge, which is fast but holds
    *  the synchronized lock on IndexWriter instance.  */
   final void mergeInit(MergePolicy.OneMerge merge) throws IOException {
-
+    assert Thread.holdsLock(this) == false;
     // Make sure any deletes that must be resolved before we commit the merge are complete:
-    bufferedUpdatesStream.waitApplyForMerge(merge.segments);
+    bufferedUpdatesStream.waitApplyForMerge(merge.segments, this);
 
     boolean success = false;
     try {
@@ -4267,7 +4312,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         Collection<String> filesToRemove = merge.info.files();
         TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
         try {
-          createCompoundFile(infoStream, trackingCFSDir, merge.info.info, context);
+          createCompoundFile(infoStream, trackingCFSDir, merge.info.info, context, this::deleteNewFiles);
           success = true;
         } catch (Throwable t) {
           synchronized(this) {
@@ -4751,7 +4796,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    * deletion files, this SegmentInfo must not reference such files when this
    * method is called, because they are not allowed within a compound file.
    */
-  final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context) throws IOException {
+  static final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context, IOUtils.IOConsumer<Collection<String>> deleteFiles) throws IOException {
 
     // maybe this check is not needed, but why take the risk?
     if (!directory.getCreatedFiles().isEmpty()) {
@@ -4769,7 +4814,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     } finally {
       if (!success) {
         // Safe: these files must exist
-        deleteNewFiles(directory.getCreatedFiles());
+        deleteFiles.accept(directory.getCreatedFiles());
       }
     }
 
@@ -4783,14 +4828,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    * @throws IOException if an {@link IOException} occurs
    * @see IndexFileDeleter#deleteNewFiles(Collection)
    */
-  synchronized final void deleteNewFiles(Collection<String> files) throws IOException {
+  private synchronized void deleteNewFiles(Collection<String> files) throws IOException {
     deleter.deleteNewFiles(files);
   }
-  
   /**
    * Cleans up residuals from a segment that could not be entirely flushed due to an error
    */
-  synchronized final void flushFailed(SegmentInfo info) throws IOException {
+  private synchronized final void flushFailed(SegmentInfo info) throws IOException {
     // TODO: this really should be a tragic
     Collection<String> files;
     try {
@@ -4803,29 +4847,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       deleter.deleteNewFiles(files);
     }
   }
-  
-  final int purge(boolean forced) throws IOException {
+
+  private int purge(boolean forced) throws IOException {
     return docWriter.purgeBuffer(this, forced);
   }
 
-  final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
-    try {
-      purge(forcePurge);
-    } finally {
-      flushCount.incrementAndGet();
-    }
-  }
-  
-  final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
-    try {
-      purge(forcePurge);
-    } finally {
-      if (triggerMerge) {
-        maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
-      }
-    }
-  }
-  
   /** Record that the files referenced by this {@link SegmentInfos} are still in use.
    *
    * @lucene.internal */
@@ -4867,8 +4893,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    * encoded inside the {@link #process(IndexWriter)} method.
    *
    */
-  interface Event {
-    
+  @FunctionalInterface
+  private interface Event {
     /**
      * Processes the event. This method is called by the {@link IndexWriter}
      * passed as the first argument.
@@ -4971,4 +4997,43 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     ensureOpen(false);
     return readerPool.get(info, create);
   }
+
+  private static final class ResolveUpdatesEvent implements Event {
+
+    private final FrozenBufferedUpdates packet;
+
+    ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
+      this.packet = packet;
+    }
+
+    @Override
+    public void process(IndexWriter writer) throws IOException {
+      try {
+        packet.apply(writer);
+      } catch (Throwable t) {
+        try {
+          writer.onTragicEvent(t, "applyUpdatesPacket");
+        } catch (Throwable t1) {
+          t.addSuppressed(t1);
+        }
+        throw t;
+      }
+      writer.flushDeletesCount.incrementAndGet();
+    }
+  }
+
+  void finished(FrozenBufferedUpdates packet) {
+    bufferedUpdatesStream.finished(packet);
+  }
+
+  int getPendingUpdatesCount() {
+    return bufferedUpdatesStream.getPendingUpdatesCount();
+  }
+
+  /**
+   * Tests should override this to enable test points. Default is <code>false</code>.
+   */
+  protected boolean isEnableTestPoints() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
index 61bf1fc..1d680ea 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
@@ -1836,9 +1836,14 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
     Directory dir = newMockDirectory(); // we want to ensure we don't leak any locks or file handles
     IndexWriterConfig iwc = new IndexWriterConfig(null);
     iwc.setInfoStream(evilInfoStream);
-    IndexWriter iw = new IndexWriter(dir, iwc);
     // TODO: cutover to RandomIndexWriter.mockIndexWriter?
-    iw.enableTestPoints = true;
+    IndexWriter iw = new IndexWriter(dir, iwc) {
+      @Override
+      protected boolean isEnableTestPoints() {
+        return true;
+      }
+    };
+
     Document doc = new Document();
     for (int i = 0; i < 10; i++) {
       iw.addDocument(doc);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/core/src/test/org/apache/lucene/index/TestInfoStream.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestInfoStream.java b/lucene/core/src/test/org/apache/lucene/index/TestInfoStream.java
index 4ef2208..4c40948 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestInfoStream.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestInfoStream.java
@@ -74,8 +74,12 @@ public class TestInfoStream extends LuceneTestCase {
         return true;
       }
     });
-    IndexWriter iw = new IndexWriter(dir, iwc);
-    iw.enableTestPoints = true;
+    IndexWriter iw = new IndexWriter(dir, iwc) {
+      @Override
+      protected boolean isEnableTestPoints() {
+        return true;
+      }
+    };
     iw.addDocument(new Document());
     iw.close();
     dir.close();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6f0a8845/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
index 15ca469..e2db533 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
@@ -80,7 +80,12 @@ public class RandomIndexWriter implements Closeable {
     IndexWriter iw;
     boolean success = false;
     try {
-      iw = new IndexWriter(dir, conf);
+      iw = new IndexWriter(dir, conf) {
+        @Override
+        protected boolean isEnableTestPoints() {
+          return true;
+        }
+      };
       success = true;
     } finally {
       if (reader != null) {
@@ -91,7 +96,6 @@ public class RandomIndexWriter implements Closeable {
         }
       }
     }
-    iw.enableTestPoints = true;
     return iw;
   }