You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/25 02:06:31 UTC

[09/47] lucene-solr:feature/autoscaling: LUCENE-7868: use multiple threads to concurrently resolve deletes and DV udpates

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 14fbbae..4870282 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -29,10 +29,11 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map.Entry;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -71,6 +72,8 @@ import org.apache.lucene.util.ThreadInterruptedException;
 import org.apache.lucene.util.UnicodeUtil;
 import org.apache.lucene.util.Version;
 
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
 /**
   An <code>IndexWriter</code> creates and maintains an index.
 
@@ -110,9 +113,7 @@ import org.apache.lucene.util.Version;
   The default is to flush when RAM usage hits
   {@link IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
   best indexing speed you should flush by RAM usage with a
-  large RAM buffer. Additionally, if IndexWriter reaches the configured number of
-  buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
-  the deleted terms and queries are flushed and applied to existing segments.
+  large RAM buffer.
   In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and 
   {@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
   won't trigger a segment flush. Note that flushing just moves the
@@ -237,7 +238,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   /** Used only for testing. */
   boolean enableTestPoints = false;
 
-  private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
+  static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
   
   /**
    * Name of the write lock in the index.
@@ -272,7 +273,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   volatile Throwable tragedy;
 
   private final Directory directoryOrig;       // original user directory
-  private final Directory directory;           // wrapped with additional checks
+  final Directory directory;           // wrapped with additional checks
   private final Analyzer analyzer;    // how to analyze text
 
   private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@@ -289,7 +290,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   final SegmentInfos segmentInfos;       // the segments
   final FieldNumbers globalFieldNumberMap;
 
-  private final DocumentsWriter docWriter;
+  final DocumentsWriter docWriter;
   private final Queue<Event> eventQueue;
   final IndexFileDeleter deleter;
 
@@ -302,11 +303,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   private volatile boolean closed;
   private volatile boolean closing;
 
+  final AtomicBoolean maybeMerge = new AtomicBoolean();
+
   private Iterable<Map.Entry<String,String>> commitUserData;
 
   // Holds all SegmentInfo instances currently involved in
   // merges
-  private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
+  HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
 
   private final MergeScheduler mergeScheduler;
   private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
@@ -317,11 +320,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   private boolean didMessageState;
 
   final AtomicInteger flushCount = new AtomicInteger();
+
   final AtomicInteger flushDeletesCount = new AtomicInteger();
 
   final ReaderPool readerPool = new ReaderPool();
   final BufferedUpdatesStream bufferedUpdatesStream;
 
+  /** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#apply}
+   *  to handle concurrently apply deletes/updates with merges completing. */
+  final AtomicLong mergeFinishedGen = new AtomicLong();
+
   // This is a "write once" variable (like the organic dye
   // on a DVD-R that may or may not be heated by a laser and
   // then cooled to permanently record the event): it's
@@ -449,21 +457,36 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           } else {
             anyChanges = false;
           }
-          if (!anyChanges) {
+          if (anyChanges == false) {
             // prevent double increment since docWriter#doFlush increments the flushcount
             // if we flushed anything.
             flushCount.incrementAndGet();
           }
-          // Prevent segmentInfos from changing while opening the
-          // reader; in theory we could instead do similar retry logic,
-          // just like we do when loading segments_N
+
+          processEvents(false, true);
+
+          if (applyAllDeletes) {
+            applyAllDeletesAndUpdates();
+          }
+
           synchronized(this) {
-            anyChanges |= maybeApplyDeletes(applyAllDeletes);
+
+            // NOTE: we cannot carry doc values updates in memory yet, so we always must write them through to disk and re-open each
+            // SegmentReader:
+
+            // TODO: we could instead just clone SIS and pull/incref readers in sync'd block, and then do this w/o IW's lock?
+            // Must do this sync'd on IW to prevent a merge from completing at the last second and failing to write its DV updates:
+            readerPool.writeAllDocValuesUpdates();
+
             if (writeAllDeletes) {
               // Must move the deletes to disk:
               readerPool.commit(segmentInfos);
             }
 
+            // Prevent segmentInfos from changing while opening the
+            // reader; in theory we could instead do similar retry logic,
+            // just like we do when loading segments_N
+            
             r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes, writeAllDeletes);
             if (infoStream.isEnabled("IW")) {
               infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
@@ -483,6 +506,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           }
         }
       }
+      anyChanges |= maybeMerge.getAndSet(false);
       if (anyChanges) {
         maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
       }
@@ -509,7 +533,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   }
 
   /** Holds shared SegmentReader instances. IndexWriter uses
-   *  SegmentReaders for 1) applying deletes, 2) doing
+   *  SegmentReaders for 1) applying deletes/DV updates, 2) doing
    *  merges, 3) handing out a real-time reader.  This pool
    *  reuses instances of the SegmentReaders in all these
    *  places if it is in "near real-time mode" (getReader()
@@ -519,8 +543,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     
     private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>();
 
-    // used only by asserts
-    public synchronized boolean infoIsLive(SegmentCommitInfo info) {
+    /** Asserts this info still exists in IW's segment infos */
+    public synchronized boolean assertInfoIsLive(SegmentCommitInfo info) {
       int idx = segmentInfos.indexOf(info);
       assert idx != -1: "info=" + info + " isn't live";
       assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
@@ -531,12 +555,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       final ReadersAndUpdates rld = readerMap.get(info);
       if (rld != null) {
         assert info == rld.info;
-//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.drop: " + info);
         readerMap.remove(info);
         rld.dropReaders();
       }
     }
 
+    public synchronized long ramBytesUsed() {
+      long bytes = 0;
+      for (ReadersAndUpdates rld : readerMap.values()) {
+        bytes += rld.ramBytesUsed.get();
+      }
+      return bytes;
+    }
+
     public synchronized boolean anyPendingDeletes() {
       for(ReadersAndUpdates rld : readerMap.values()) {
         if (rld.getPendingDeleteCount() != 0) {
@@ -556,30 +587,39 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // Matches incRef in get:
       rld.decRef();
 
-      // Pool still holds a ref:
-      assert rld.refCount() >= 1;
-
-      if (!poolReaders && rld.refCount() == 1) {
-        // This is the last ref to this RLD, and we're not
-        // pooling, so remove it:
-//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: " + rld.info);
-        if (rld.writeLiveDocs(directory)) {
-          // Make sure we only write del docs for a live segment:
-          assert assertInfoLive == false || infoIsLive(rld.info);
-          // Must checkpoint because we just
-          // created new _X_N.del and field updates files;
-          // don't call IW.checkpoint because that also
-          // increments SIS.version, which we do not want to
-          // do here: it was done previously (after we
-          // invoked BDS.applyDeletes), whereas here all we
-          // did was move the state to disk:
-          checkpointNoSIS();
-        }
-        //System.out.println("IW: done writeLiveDocs for info=" + rld.info);
+      if (rld.refCount() == 0) {
+        // This happens if the segment was just merged away, while a buffered deletes packet was still applying deletes/updates to it.
+        assert readerMap.containsKey(rld.info) == false: "seg=" + rld.info + " has refCount 0 but still unexpectedly exists in the reader pool";
+      } else {
 
-//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: drop readers " + rld.info);
-        rld.dropReaders();
-        readerMap.remove(rld.info);
+        // Pool still holds a ref:
+        assert rld.refCount() > 0: "refCount=" + rld.refCount() + " reader=" + rld.info;
+
+        if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) {
+          // This is the last ref to this RLD, and we're not
+          // pooling, so remove it:
+          if (rld.writeLiveDocs(directory)) {
+            // Make sure we only write del docs for a live segment:
+            assert assertInfoLive == false || assertInfoIsLive(rld.info);
+            // Must checkpoint because we just
+            // created new _X_N.del and field updates files;
+            // don't call IW.checkpoint because that also
+            // increments SIS.version, which we do not want to
+            // do here: it was done previously (after we
+            // invoked BDS.applyDeletes), whereas here all we
+            // did was move the state to disk:
+            checkpointNoSIS();
+          }
+
+          rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+
+          if (rld.getNumDVUpdates() == 0) {
+            rld.dropReaders();
+            readerMap.remove(rld.info);
+          } else {
+            // We are forced to pool this segment until its deletes fully apply (no delGen gaps)
+          }
+        }
       }
     }
     
@@ -588,6 +628,96 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       dropAll(false);
     }
 
+    void writeAllDocValuesUpdates() throws IOException {
+      Collection<ReadersAndUpdates> copy;
+      synchronized (this) {
+        copy = new HashSet<>(readerMap.values());
+      }
+      boolean any = false;
+      for (ReadersAndUpdates rld : copy) {
+        any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+      }
+      if (any) {
+        checkpoint();
+      }
+    }
+
+    void writeDocValuesUpdates(List<SegmentCommitInfo> infos) throws IOException {
+      boolean any = false;
+      for (SegmentCommitInfo info : infos) {
+        ReadersAndUpdates rld = get(info, false);
+        if (rld != null) {
+          any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+        }
+      }
+      if (any) {
+        checkpoint();
+      }
+    }
+
+    private final AtomicBoolean writeDocValuesLock = new AtomicBoolean();
+
+    void writeSomeDocValuesUpdates() throws IOException {
+
+      assert Thread.holdsLock(IndexWriter.this) == false;
+
+      if (writeDocValuesLock.compareAndSet(false, true)) {
+        try {
+
+          LiveIndexWriterConfig config = getConfig();
+          double mb = config.getRAMBufferSizeMB();
+          // If the reader pool is > 50% of our IW buffer, then write the updates:
+          if (mb != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+            long startNS = System.nanoTime();
+            
+            long ramBytesUsed = ramBytesUsed();
+            if (ramBytesUsed > 0.5 * mb * 1024 * 1024) {
+              if (infoStream.isEnabled("BD")) {
+                infoStream.message("BD", String.format(Locale.ROOT, "now write some pending DV updates: %.2f MB used vs IWC Buffer %.2f MB",
+                                                       ramBytesUsed/1024./1024., mb));
+              }
+          
+              // Sort by largest ramBytesUsed:
+              PriorityQueue<ReadersAndUpdates> queue = new PriorityQueue<>(readerMap.size(), (a, b) -> Long.compare(b.ramBytesUsed.get(), a.ramBytesUsed.get()));
+              synchronized (this) {
+                for (ReadersAndUpdates rld : readerMap.values()) {
+                  queue.add(rld);
+                }
+              }
+
+              int count = 0;
+              while (ramBytesUsed > 0.5 * mb * 1024 * 1024) {
+                ReadersAndUpdates rld = queue.poll();
+                if (rld == null) {
+                  break;
+                }
+
+                // We need to do before/after because not all RAM in this RAU is used by DV updates, and
+                // not all of those bytes can be written here:
+                long bytesUsedBefore = rld.ramBytesUsed.get();
+
+                // Only acquire IW lock on each write, since this is a time consuming operation.  This way
+                // other threads get a chance to run in between our writes.
+                synchronized (IndexWriter.this) {
+                  rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+                }
+                long bytesUsedAfter = rld.ramBytesUsed.get();
+                ramBytesUsed -= bytesUsedBefore - bytesUsedAfter;
+                count++;
+              }
+
+              if (infoStream.isEnabled("BD")) {
+                infoStream.message("BD", String.format(Locale.ROOT, "done write some DV updates for %d segments: now %.2f MB used vs IWC Buffer %.2f MB; took %.2f sec",
+                                                       count, ramBytesUsed()/1024./1024., mb, ((System.nanoTime() - startNS)/1000000000.)));
+              }
+            }
+          }
+        } finally {
+          writeDocValuesLock.set(false);
+        }
+      }
+    }
+
     /** Remove all our references to readers, and commits
      *  any pending changes. */
     synchronized void dropAll(boolean doSave) throws IOException {
@@ -599,7 +729,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         try {
           if (doSave && rld.writeLiveDocs(directory)) {
             // Make sure we only write del docs and field updates for a live segment:
-            assert infoIsLive(rld.info);
+            assert assertInfoIsLive(rld.info);
             // Must checkpoint because we just
             // created new _X_N.del and field updates files;
             // don't call IW.checkpoint because that also
@@ -654,9 +784,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         final ReadersAndUpdates rld = readerMap.get(info);
         if (rld != null) {
           assert rld.info == info;
-          if (rld.writeLiveDocs(directory)) {
+          boolean changed = rld.writeLiveDocs(directory);
+          
+          changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+
+          if (changed) {
             // Make sure we only write del docs for a live segment:
-            assert infoIsLive(info);
+            assert assertInfoIsLive(info);
 
             // Must checkpoint because we just
             // created new _X_N.del and field updates files;
@@ -667,10 +801,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             // did was move the state to disk:
             checkpointNoSIS();
           }
+
         }
       }
     }
 
+    public synchronized boolean anyChanges() {
+      for (ReadersAndUpdates rld : readerMap.values()) {
+        // NOTE: we don't check for pending deletes because deletes carry over in RAM to NRT readers
+        if (rld.getNumDVUpdates() != 0) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
     /**
      * Obtain a ReadersAndLiveDocs instance from the
      * readerPool.  If create is true, you must later call
@@ -685,14 +831,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
       ReadersAndUpdates rld = readerMap.get(info);
       if (rld == null) {
-        if (!create) {
+        if (create == false) {
           return null;
         }
         rld = new ReadersAndUpdates(IndexWriter.this, info);
         // Steal initial reference:
         readerMap.put(info, rld);
       } else {
-        assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + infoIsLive(rld.info) + " vs " + infoIsLive(info);
+        assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + assertInfoIsLive(rld.info) + " vs " + assertInfoIsLive(info);
       }
 
       if (create) {
@@ -809,7 +955,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       mergeScheduler.setInfoStream(infoStream);
       codec = config.getCodec();
 
-      bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
+      bufferedUpdatesStream = new BufferedUpdatesStream(this);
       poolReaders = config.getReaderPooling();
 
       OpenMode mode = config.getOpenMode();
@@ -1446,38 +1592,38 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       ReadersAndUpdates rld = readerPool.get(info, false);
       if (rld != null) {
         synchronized(bufferedUpdatesStream) {
-          rld.initWritableLiveDocs();
           if (rld.delete(docID)) {
             final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
             if (fullDelCount == rld.info.info.maxDoc()) {
-              // If a merge has already registered for this
-              // segment, we leave it in the readerPool; the
-              // merge will skip merging it and will then drop
-              // it once it's done:
-              if (!mergingSegments.contains(rld.info)) {
-                segmentInfos.remove(rld.info);
-                readerPool.drop(rld.info);
-                checkpoint();
-              }
+              dropDeletedSegment(rld.info);
+              checkpoint();
             }
 
             // Must bump changeCount so if no other changes
             // happened, we still commit this change:
             changed();
           }
-          //System.out.println("  yes " + info.info.name + " " + docID);
           return docWriter.deleteQueue.getNextSequenceNumber();
         }
-      } else {
-        //System.out.println("  no rld " + info.info.name + " " + docID);
       }
-    } else {
-      //System.out.println("  no seg " + info.info.name + " " + docID);
     }
 
     return -1;
   }
 
+  /** Drops a segment that has 100% deleted documents. */
+  synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
+    // If a merge has already registered for this
+    // segment, we leave it in the readerPool; the
+    // merge will skip merging it and will then drop
+    // it once it's done:
+    if (mergingSegments.contains(info) == false) {
+      segmentInfos.remove(info);
+      pendingNumDocs.addAndGet(-info.info.maxDoc());
+      readerPool.drop(info);
+    }
+  }
+
   /**
    * Deletes the document(s) containing any of the
    * terms. All given deletes are applied and flushed atomically
@@ -1881,20 +2027,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   public void forceMerge(int maxNumSegments, boolean doWait) throws IOException {
     ensureOpen();
 
-    if (maxNumSegments < 1)
+    if (maxNumSegments < 1) {
       throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments);
+    }
 
     if (infoStream.isEnabled("IW")) {
       infoStream.message("IW", "forceMerge: index now " + segString());
       infoStream.message("IW", "now flush at forceMerge");
     }
-
     flush(true, true);
-
     synchronized(this) {
       resetMergeExceptions();
       segmentsToMerge.clear();
       for(SegmentCommitInfo info : segmentInfos) {
+        assert info != null;
         segmentsToMerge.put(info, Boolean.TRUE);
       }
       mergeMaxNumSegments = maxNumSegments;
@@ -1903,12 +2049,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // merge:
       for(final MergePolicy.OneMerge merge  : pendingMerges) {
         merge.maxNumSegments = maxNumSegments;
-        segmentsToMerge.put(merge.info, Boolean.TRUE);
+        if (merge.info != null) {
+          // TODO: explain why this is sometimes still null
+          segmentsToMerge.put(merge.info, Boolean.TRUE);
+        }
       }
 
       for (final MergePolicy.OneMerge merge: runningMerges) {
         merge.maxNumSegments = maxNumSegments;
-        segmentsToMerge.put(merge.info, Boolean.TRUE);
+        if (merge.info != null) {
+          // TODO: explain why this is sometimes still null
+          segmentsToMerge.put(merge.info, Boolean.TRUE);
+        }
       }
     }
 
@@ -2076,7 +2228,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
   }
 
-  private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
+  final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
     ensureOpen(false);
     boolean newMergesFound = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
     mergeScheduler.merge(this, trigger, newMergesFound);
@@ -2103,7 +2255,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     final MergePolicy.MergeSpecification spec;
     if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
       assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
-        "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
+      "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
+
       spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
       newMergesFound = spec != null;
       if (newMergesFound) {
@@ -2212,6 +2365,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       mergeScheduler.close();
 
       bufferedUpdatesStream.clear();
+
       docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes 
       docWriter.abort(this); // don't sync on IW here
       synchronized(this) {
@@ -2496,49 +2650,63 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     segmentInfos.changed();
   }
 
-  synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) {
+  synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
     assert packet != null && packet.any();
-    synchronized (bufferedUpdatesStream) {
-      bufferedUpdatesStream.push(packet);
-    }
+    bufferedUpdatesStream.push(packet);
+    docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
   }
-  
+
   /**
    * Atomically adds the segment private delete packet and publishes the flushed
    * segments SegmentInfo to the index writer.
    */
-  void publishFlushedSegment(SegmentCommitInfo newSegment,
-      FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket) throws IOException {
+  synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
+                                          FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
+                                          Sorter.DocMap sortMap) throws IOException {
     try {
-      synchronized (this) {
-        // Lock order IW -> BDS
-        ensureOpen(false);
-        synchronized (bufferedUpdatesStream) {
-          if (infoStream.isEnabled("IW")) {
-            infoStream.message("IW", "publishFlushedSegment");
-          }
+      // Lock order IW -> BDS
+      ensureOpen(false);
+
+      if (infoStream.isEnabled("IW")) {
+        infoStream.message("IW", "publishFlushedSegment " + newSegment);
+      }
+
+      if (globalPacket != null && globalPacket.any()) {
+        // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
+        bufferedUpdatesStream.push(globalPacket);
+        docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(globalPacket));
+      }
+
+      // Publishing the segment must be sync'd on IW -> BDS to make the sure
+      // that no merge prunes away the seg. private delete packet
+      final long nextGen;
+      if (packet != null && packet.any()) {
+        nextGen = bufferedUpdatesStream.push(packet);
+
+        // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
+        docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
           
-          if (globalPacket != null && globalPacket.any()) {
-            bufferedUpdatesStream.push(globalPacket);
-          } 
-          // Publishing the segment must be synched on IW -> BDS to make the sure
-          // that no merge prunes away the seg. private delete packet
-          final long nextGen;
-          if (packet != null && packet.any()) {
-            nextGen = bufferedUpdatesStream.push(packet);
-          } else {
-            // Since we don't have a delete packet to apply we can get a new
-            // generation right away
-            nextGen = bufferedUpdatesStream.getNextGen();
-          }
-          if (infoStream.isEnabled("IW")) {
-            infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
-          }
-          newSegment.setBufferedDeletesGen(nextGen);
-          segmentInfos.add(newSegment);
-          checkpoint();
-        }
+      } else {
+        // Since we don't have a delete packet to apply we can get a new
+        // generation right away
+        nextGen = bufferedUpdatesStream.getNextGen();
+        // No deletes/updates here, so marked finished immediately:
+        bufferedUpdatesStream.finishedSegment(nextGen);
+      }
+      if (infoStream.isEnabled("IW")) {
+        infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
       }
+      newSegment.setBufferedDeletesGen(nextGen);
+      segmentInfos.add(newSegment);
+      checkpoint();
+
+      if (packet != null && packet.any() && sortMap != null) {
+        // TODO: not great we do this heavyish op while holding IW's monitor lock,
+        // but it only applies if you are using sorted indices and updating doc values:
+        ReadersAndUpdates rld = readerPool.get(newSegment, true);
+        rld.sortMap = sortMap;
+      }
+      
     } finally {
       flushCount.incrementAndGet();
       doAfterFlush();
@@ -2924,7 +3092,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   /** Copies the segment files as-is into the IndexWriter's directory. */
   private SegmentCommitInfo copySegmentAsIs(SegmentCommitInfo info, String segName, IOContext context) throws IOException {
     
-    //System.out.println("copy seg=" + info.info.name + " version=" + info.info.getVersion());
     // Same SI as before but we change directory and name
     SegmentInfo newInfo = new SegmentInfo(directoryOrig, info.info.getVersion(), info.info.getMinVersion(), segName, info.info.maxDoc(),
                                           info.info.getUseCompoundFile(), info.info.getCodec(), 
@@ -2991,16 +3158,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   @Override
   public final long prepareCommit() throws IOException {
     ensureOpen();
-    boolean[] doMaybeMerge = new boolean[1];
-    pendingSeqNo = prepareCommitInternal(doMaybeMerge);
+    pendingSeqNo = prepareCommitInternal();
     // we must do this outside of the commitLock else we can deadlock:
-    if (doMaybeMerge[0]) {
+    if (maybeMerge.getAndSet(false)) {
       maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);      
     }
     return pendingSeqNo;
   }
 
-  private long prepareCommitInternal(boolean[] doMaybeMerge) throws IOException {
+  private long prepareCommitInternal() throws IOException {
     startCommitTime = System.nanoTime();
     synchronized(commitLock) {
       ensureOpen(false);
@@ -3020,7 +3186,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       doBeforeFlush();
       testPoint("startDoFlush");
       SegmentInfos toCommit = null;
-      boolean anySegmentsFlushed = false;
+      boolean anyChanges = false;
       long seqNo;
 
       // This is copied from doFlush, except it's modified to
@@ -3035,19 +3201,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           try {
             seqNo = docWriter.flushAllThreads();
             if (seqNo < 0) {
-              anySegmentsFlushed = true;
+              anyChanges = true;
               seqNo = -seqNo;
             }
-            if (!anySegmentsFlushed) {
+            if (anyChanges == false) {
               // prevent double increment since docWriter#doFlush increments the flushcount
               // if we flushed anything.
               flushCount.incrementAndGet();
             }
-            processEvents(false, true);
+
+            // cannot pass triggerMerges=true here else it can lead to deadlock:
+            processEvents(false, false);
+            
             flushSuccess = true;
 
+            applyAllDeletesAndUpdates();
+
             synchronized(this) {
-              maybeApplyDeletes(true);
 
               readerPool.commit(segmentInfos);
 
@@ -3106,8 +3276,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
      
       boolean success = false;
       try {
-        if (anySegmentsFlushed) {
-          doMaybeMerge[0] = true;
+        if (anyChanges) {
+          maybeMerge.set(true);
         }
         startCommit(toCommit);
         success = true;
@@ -3228,8 +3398,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       infoStream.message("IW", "commit: start");
     }
 
-    boolean[] doMaybeMerge = new boolean[1];
-
     long seqNo;
 
     synchronized(commitLock) {
@@ -3243,7 +3411,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         if (infoStream.isEnabled("IW")) {
           infoStream.message("IW", "commit: now prepare");
         }
-        seqNo = prepareCommitInternal(doMaybeMerge);
+        seqNo = prepareCommitInternal();
       } else {
         if (infoStream.isEnabled("IW")) {
           infoStream.message("IW", "commit: already prepared");
@@ -3255,7 +3423,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
 
     // we must do this outside of the commitLock else we can deadlock:
-    if (doMaybeMerge[0]) {
+    if (maybeMerge.getAndSet(false)) {
       maybeMerge(mergePolicy, MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);      
     }
     
@@ -3417,8 +3585,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           processEvents(false, true);
         }
       }
+
+      if (applyAllDeletes) {
+        applyAllDeletesAndUpdates();
+      }
+
+      anyChanges |= maybeMerge.getAndSet(false);
+      
       synchronized(this) {
-        anyChanges |= maybeApplyDeletes(applyAllDeletes);
         doAfterFlush();
         success = true;
         return anyChanges;
@@ -3436,48 +3610,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
   }
   
-  final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
-    if (applyAllDeletes) {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "apply all deletes during flush");
-      }
-      return applyAllDeletesAndUpdates();
-    } else if (infoStream.isEnabled("IW")) {
-      infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed());
-    }
-
-    return false;
-  }
-  
-  final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
+  final void applyAllDeletesAndUpdates() throws IOException {
+    assert Thread.holdsLock(this) == false;
     flushDeletesCount.incrementAndGet();
-    final BufferedUpdatesStream.ApplyDeletesResult result;
     if (infoStream.isEnabled("IW")) {
-      infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalMaxDoc()));
-    }
-    result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
-    if (result.anyDeletes) {
-      checkpoint();
+      infoStream.message("IW", "now apply all deletes for all segments buffered updates bytesUsed=" + bufferedUpdatesStream.ramBytesUsed() + " reader pool bytesUsed=" + readerPool.ramBytesUsed());
     }
-    if (!keepFullyDeletedSegments && result.allDeleted != null) {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
-      }
-      for (SegmentCommitInfo info : result.allDeleted) {
-        // If a merge has already registered for this
-        // segment, we leave it in the readerPool; the
-        // merge will skip merging it and will then drop
-        // it once it's done:
-        if (!mergingSegments.contains(info)) {
-          segmentInfos.remove(info);
-          pendingNumDocs.addAndGet(-info.info.maxDoc());
-          readerPool.drop(info);
-        }
-      }
-      checkpoint();
-    }
-    bufferedUpdatesStream.prune(segmentInfos);
-    return result.anyDeletes;
+    bufferedUpdatesStream.waitApplyAll();
   }
 
   // for testing only
@@ -3514,41 +3653,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   
   private static class MergedDeletesAndUpdates {
     ReadersAndUpdates mergedDeletesAndUpdates = null;
-    boolean initializedWritableLiveDocs = false;
     
     MergedDeletesAndUpdates() {}
     
-    final void init(ReaderPool readerPool, MergePolicy.OneMerge merge, boolean initWritableLiveDocs) throws IOException {
+    final void init(ReaderPool readerPool, MergePolicy.OneMerge merge) throws IOException {
       if (mergedDeletesAndUpdates == null) {
         mergedDeletesAndUpdates = readerPool.get(merge.info, true);
       }
-      if (initWritableLiveDocs && !initializedWritableLiveDocs) {
-        mergedDeletesAndUpdates.initWritableLiveDocs();
-        this.initializedWritableLiveDocs = true;
-      }
-    }
-    
-  }
-  
-  private void maybeApplyMergedDVUpdates(MergePolicy.OneMerge merge, MergeState mergeState,
-      MergedDeletesAndUpdates holder, String[] mergingFields, DocValuesFieldUpdates[] dvFieldUpdates,
-      DocValuesFieldUpdates.Iterator[] updatesIters, int segment, int curDoc) throws IOException {
-    int newDoc = -1;
-    for (int idx = 0; idx < mergingFields.length; idx++) {
-      DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
-      if (updatesIter.doc() == curDoc) { // document has an update
-        if (holder.mergedDeletesAndUpdates == null) {
-          holder.init(readerPool, merge, false);
-        }
-        if (newDoc == -1) { // map once per all field updates, but only if there are any updates
-          newDoc = mergeState.docMaps[segment].get(curDoc);
-        }
-        DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
-        dvUpdates.add(newDoc, updatesIter.value());
-        updatesIter.nextDoc(); // advance to next document
-      } else {
-        assert updatesIter.doc() > curDoc : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + curDoc;
-      }
     }
   }
 
@@ -3564,6 +3675,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    */
   synchronized private ReadersAndUpdates commitMergedDeletesAndUpdates(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
 
+    mergeFinishedGen.incrementAndGet();
+    
     testPoint("startCommitMergeDeletes");
 
     final List<SegmentCommitInfo> sourceSegments = merge.segments;
@@ -3576,9 +3689,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     // started merging:
     long minGen = Long.MAX_VALUE;
 
-    // Lazy init (only when we find a delete to carry over):
-    final MergedDeletesAndUpdates holder = new MergedDeletesAndUpdates();
-    final DocValuesFieldUpdates.Container mergedDVUpdates = new DocValuesFieldUpdates.Container();
+    // Lazy init (only when we find a delete or update to carry over):
+    final ReadersAndUpdates mergedDeletesAndUpdates = readerPool.get(merge.info, true);
+    
+    // field -> delGen -> dv field updates
+    Map<String,Map<Long,DocValuesFieldUpdates>> mappedDVUpdates = new HashMap<>();
+
+    boolean anyDVUpdates = false;
 
     assert sourceSegments.size() == mergeState.docMaps.length;
     for (int i = 0; i < sourceSegments.size(); i++) {
@@ -3587,36 +3704,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       final int maxDoc = info.info.maxDoc();
       final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
       final ReadersAndUpdates rld = readerPool.get(info, false);
-      // We hold a ref so it should still be in the pool:
+      // We hold a ref, from when we opened the readers during mergeInit, so it better still be in the pool:
       assert rld != null: "seg=" + info.info.name;
       final Bits currentLiveDocs = rld.getLiveDocs();
-      final Map<String,DocValuesFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
-      final String[] mergingFields;
-      final DocValuesFieldUpdates[] dvFieldUpdates;
-      final DocValuesFieldUpdates.Iterator[] updatesIters;
-      if (mergingFieldUpdates.isEmpty()) {
-        mergingFields = null;
-        updatesIters = null;
-        dvFieldUpdates = null;
-      } else {
-        mergingFields = new String[mergingFieldUpdates.size()];
-        dvFieldUpdates = new DocValuesFieldUpdates[mergingFieldUpdates.size()];
-        updatesIters = new DocValuesFieldUpdates.Iterator[mergingFieldUpdates.size()];
-        int idx = 0;
-        for (Entry<String,DocValuesFieldUpdates> e : mergingFieldUpdates.entrySet()) {
-          String field = e.getKey();
-          DocValuesFieldUpdates updates = e.getValue();
-          mergingFields[idx] = field;
-          dvFieldUpdates[idx] = mergedDVUpdates.getUpdates(field, updates.type);
-          if (dvFieldUpdates[idx] == null) {
-            dvFieldUpdates[idx] = mergedDVUpdates.newUpdates(field, updates.type, mergeState.segmentInfo.maxDoc());
-          }
-          updatesIters[idx] = updates.iterator();
-          updatesIters[idx].nextDoc(); // advance to first update doc
-          ++idx;
-        }
-      }
-//      System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: info=" + info + ", mergingUpdates=" + mergingUpdates);
+
+      MergeState.DocMap segDocMap = mergeState.docMaps[i];
+      MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i];
 
       if (prevLiveDocs != null) {
 
@@ -3648,26 +3741,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
               assert currentLiveDocs.get(j) == false;
             } else if (currentLiveDocs.get(j) == false) {
               // the document was deleted while we were merging:
-              if (holder.mergedDeletesAndUpdates == null || holder.initializedWritableLiveDocs == false) {
-                holder.init(readerPool, merge, true);
-              }
-              holder.mergedDeletesAndUpdates.delete(mergeState.docMaps[i].get(mergeState.leafDocMaps[i].get(j)));
-              if (mergingFields != null) { // advance all iters beyond the deleted document
-                skipDeletedDoc(updatesIters, j);
-              }
-            } else if (mergingFields != null) {
-              maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
-            }
-          }
-        } else if (mergingFields != null) {
-          // need to check each non-deleted document if it has any updates
-          for (int j = 0; j < maxDoc; j++) {
-            if (prevLiveDocs.get(j)) {
-              // document isn't deleted, check if any of the fields have an update to it
-              maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
-            } else {
-              // advance all iters beyond the deleted document
-              skipDeletedDoc(updatesIters, j);
+              mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
             }
           }
         }
@@ -3677,52 +3751,83 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         // does:
         for (int j = 0; j < maxDoc; j++) {
           if (currentLiveDocs.get(j) == false) {
-            if (holder.mergedDeletesAndUpdates == null || !holder.initializedWritableLiveDocs) {
-              holder.init(readerPool, merge, true);
+            mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
+          }
+        }
+      }
+
+      // Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs.
+      // We only carry over packets that finished resolving; if any are still running (concurrently) they will detect that our merge completed
+      // and re-resolve against the newly merged segment:
+      
+      Map<String,List<DocValuesFieldUpdates>> mergingDVUpdates = rld.getMergingDVUpdates();
+
+      for (Map.Entry<String,List<DocValuesFieldUpdates>> ent : mergingDVUpdates.entrySet()) {
+
+        String field = ent.getKey();
+
+        Map<Long,DocValuesFieldUpdates> mappedField = mappedDVUpdates.get(field);
+        if (mappedField == null) {
+          mappedField = new HashMap<>();
+          mappedDVUpdates.put(field, mappedField);
+        }
+
+        for (DocValuesFieldUpdates updates : ent.getValue()) {
+
+          if (bufferedUpdatesStream.stillRunning(updates.delGen)) {
+            continue;
+          }
+              
+          // sanity check:
+          assert field.equals(updates.field);
+
+          DocValuesFieldUpdates mappedUpdates = mappedField.get(updates.delGen);
+          if (mappedUpdates == null) {
+            switch (updates.type) {
+              case NUMERIC:
+                mappedUpdates = new NumericDocValuesFieldUpdates(updates.delGen, updates.field, merge.info.info.maxDoc());
+                break;
+              case BINARY:
+                mappedUpdates = new BinaryDocValuesFieldUpdates(updates.delGen, updates.field, merge.info.info.maxDoc());
+                break;
+              default:
+                throw new AssertionError();
             }
-            holder.mergedDeletesAndUpdates.delete(mergeState.docMaps[i].get(mergeState.leafDocMaps[i].get(j)));
-            if (mergingFields != null) { // advance all iters beyond the deleted document
-              skipDeletedDoc(updatesIters, j);
+            mappedField.put(updates.delGen, mappedUpdates);
+          }
+
+          DocValuesFieldUpdates.Iterator it = updates.iterator();
+          int doc;
+          while ((doc = it.nextDoc()) != NO_MORE_DOCS) {
+            int mappedDoc = segDocMap.get(segLeafDocMap.get(doc));
+            if (mappedDoc != -1) {
+              // not deleted
+              mappedUpdates.add(mappedDoc, it.value());
+              anyDVUpdates = true;
             }
-          } else if (mergingFields != null) {
-            maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
           }
         }
-      } else if (mergingFields != null) {
-        // no deletions before or after, but there were updates
-        for (int j = 0; j < maxDoc; j++) {
-          maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
-        }
       }
     }
 
-    if (mergedDVUpdates.any()) {
-//      System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates);
-      boolean success = false;
-      try {
-        // if any error occurs while writing the field updates we should release
-        // the info, otherwise it stays in the pool but is considered not "live"
-        // which later causes false exceptions in pool.dropAll().
-        // NOTE: currently this is the only place which throws a true
-        // IOException. If this ever changes, we need to extend that try/finally
-        // block to the rest of the method too.
-        holder.mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedDVUpdates);
-        success = true;
-      } finally {
-        if (!success) {
-          holder.mergedDeletesAndUpdates.dropChanges();
-          readerPool.drop(merge.info);
+    if (anyDVUpdates) {
+      // Persist the merged DV updates onto the RAU for the merged segment:
+      for(Map<Long,DocValuesFieldUpdates> d : mappedDVUpdates.values()) {
+        for (DocValuesFieldUpdates updates : d.values()) {
+          updates.finish();
+          mergedDeletesAndUpdates.addDVUpdate(updates);
         }
       }
     }
-    
+
     if (infoStream.isEnabled("IW")) {
-      if (holder.mergedDeletesAndUpdates == null) {
+      if (mergedDeletesAndUpdates == null) {
         infoStream.message("IW", "no new deletes or field updates since merge started");
       } else {
-        String msg = holder.mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
-        if (mergedDVUpdates.any()) {
-          msg += " and " + mergedDVUpdates.size() + " new field updates";
+        String msg = mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
+        if (anyDVUpdates) {
+          msg += " and " + mergedDeletesAndUpdates.getNumDVUpdates() + " new field updates";
+          msg += " (" + mergedDeletesAndUpdates.ramBytesUsed.get() + ") bytes";
         }
         msg += " since merge started";
         infoStream.message("IW", msg);
@@ -3731,7 +3836,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
     merge.info.setBufferedDeletesGen(minGen);
 
-    return holder.mergedDeletesAndUpdates;
+    return mergedDeletesAndUpdates;
   }
 
   synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
@@ -3775,7 +3880,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
 
     final ReadersAndUpdates mergedUpdates = merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, mergeState);
-//    System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: mergedDeletes=" + mergedDeletes);
 
     // If the doc store we are using has been closed and
     // is in now compound format (but wasn't when we
@@ -3922,9 +4026,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       try {
         try {
           mergeInit(merge);
-          //if (merge.info != null) {
-          //System.out.println("MERGE: " + merge.info.info.name);
-          //}
 
           if (infoStream.isEnabled("IW")) {
             infoStream.message("IW", "now merge\n  merge=" + segString(merge.segments) + "\n  index=" + segString());
@@ -4064,7 +4165,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
   /** Does initial setup for a merge, which is fast but holds
    *  the synchronized lock on IndexWriter instance.  */
-  final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
+  final void mergeInit(MergePolicy.OneMerge merge) throws IOException {
+
+    // Make sure any deletes that must be resolved before we commit the merge are complete:
+    bufferedUpdatesStream.waitApplyForMerge(merge.segments);
+
     boolean success = false;
     try {
       _mergeInit(merge);
@@ -4110,29 +4215,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
     }
 
-    // Lock order: IW -> BD
-    final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
+    // Must move the pending doc values updates to disk now, else the newly merged segment will not see them:
+    // TODO: we could fix merging to pull the merged DV iterator so we don't have to move these updates to disk first, i.e. just carry them
+    // in memory:
+    readerPool.writeDocValuesUpdates(merge.segments);
     
-    if (result.anyDeletes) {
-      checkpoint();
-    }
-
-    if (!keepFullyDeletedSegments && result.allDeleted != null) {
-      if (infoStream.isEnabled("IW")) {
-        infoStream.message("IW", "drop 100% deleted segments: " + result.allDeleted);
-      }
-      for(SegmentCommitInfo info : result.allDeleted) {
-        segmentInfos.remove(info);
-        pendingNumDocs.addAndGet(-info.info.maxDoc());
-        if (merge.segments.contains(info)) {
-          mergingSegments.remove(info);
-          merge.segments.remove(info);
-        }
-        readerPool.drop(info);
-      }
-      checkpoint();
-    }
-
     // Bind a new segment name here so even with
     // ConcurrentMergePolicy we keep deterministic segment
     // names.
@@ -4145,11 +4232,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     setDiagnostics(si, SOURCE_MERGE, details);
     merge.setMergeInfo(new SegmentCommitInfo(si, 0, -1L, -1L, -1L));
 
-//    System.out.println("[" + Thread.currentThread().getName() + "] IW._mergeInit: " + segString(merge.segments) + " into " + si);
-
-    // Lock order: IW -> BD
-    bufferedUpdatesStream.prune(segmentInfos);
-
     if (infoStream.isEnabled("IW")) {
       infoStream.message("IW", "merge seg=" + merge.info.info.name + " " + segString(merge.segments));
     }
@@ -4204,7 +4286,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     final int numSegments = merge.readers.size();
     Throwable th = null;
 
-    boolean drop = !suppressExceptions;
+    boolean drop = suppressExceptions == false;
     
     for (int i = 0; i < numSegments; i++) {
       final SegmentReader sr = merge.readers.get(i);
@@ -4278,59 +4360,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         // commit merged deletes
         final ReadersAndUpdates rld = readerPool.get(info, true);
 
-        // Carefully pull the most recent live docs and reader
-        SegmentReader reader;
-        final Bits liveDocs;
-        final int delCount;
-
-        synchronized (this) {
-          // Must sync to ensure BufferedDeletesStream cannot change liveDocs,
-          // pendingDeleteCount and field updates while we pull a copy:
-          reader = rld.getReaderForMerge(context);
-          liveDocs = rld.getReadOnlyLiveDocs();
-          delCount = rld.getPendingDeleteCount() + info.getDelCount();
-
-          assert reader != null;
-          assert rld.verifyDocCounts();
-
-          if (infoStream.isEnabled("IW")) {
-            if (rld.getPendingDeleteCount() != 0) {
-              infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.getPendingDeleteCount());
-            } else if (info.getDelCount() != 0) {
-              infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount());
-            } else {
-              infoStream.message("IW", "seg=" + segString(info) + " no deletes");
-            }
-          }
-        }
-
-        // Deletes might have happened after we pulled the merge reader and
-        // before we got a read-only copy of the segment's actual live docs
-        // (taking pending deletes into account). In that case we need to
-        // make a new reader with updated live docs and del count.
-        if (reader.numDeletedDocs() != delCount) {
-          // fix the reader's live docs and del count
-          assert delCount > reader.numDeletedDocs(); // beware of zombies
-
-          SegmentReader newReader;
+        SegmentReader reader = rld.getReaderForMerge(context);
+        int delCount = reader.numDeletedDocs();
 
-          synchronized (this) {
-            // We must also sync on IW here, because another thread could be writing
-            // new DV updates / remove old gen field infos files causing FNFE:
-            newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - delCount);
-          }
-
-          boolean released = false;
-          try {
-            rld.release(reader);
-            released = true;
-          } finally {
-            if (!released) {
-              newReader.decRef();
-            }
-          }
-
-          reader = newReader;
+        if (infoStream.isEnabled("IW")) {
+          infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
         }
 
         merge.readers.add(reader);
@@ -4338,8 +4372,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         segUpto++;
       }
 
-//      System.out.println("[" + Thread.currentThread().getName() + "] IW.mergeMiddle: merging " + merge.getMergeReaders());
-
       // Let the merge wrap readers
       List<CodecReader> mergeReaders = new ArrayList<>();
       for (SegmentReader reader : merge.readers) {
@@ -4411,7 +4443,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // Very important to do this before opening the reader
       // because codec must know if prox was written for
       // this segment:
-      //System.out.println("merger set hasProx=" + merger.hasProx() + " seg=" + merge.info.name);
       boolean useCompoundFile;
       synchronized (this) { // Guard segmentInfos
         useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.info, this);
@@ -4601,7 +4632,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
   }
 
-  private boolean keepFullyDeletedSegments;
+  boolean keepFullyDeletedSegments;
 
   /** Only for testing.
    *
@@ -4714,8 +4745,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             infoStream.message("IW", "startCommit: wrote pending segments file \"" + IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", toSync.getGeneration()) + "\"");
           }
 
-          //System.out.println("DONE prepareCommit");
-
           pendingCommitSet = true;
           pendingCommit = toSync;
         }
@@ -4863,9 +4892,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   }
 
   synchronized boolean nrtIsCurrent(SegmentInfos infos) {
-    //System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
     ensureOpen();
-    boolean isCurrent = infos.getVersion() == segmentInfos.getVersion() && !docWriter.anyChanges() && !bufferedUpdatesStream.any();
+    boolean isCurrent = infos.getVersion() == segmentInfos.getVersion()
+      && docWriter.anyChanges() == false
+      && bufferedUpdatesStream.any() == false
+      && readerPool.anyChanges() == false;
     if (infoStream.isEnabled("IW")) {
       if (isCurrent == false) {
         infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.getVersion() == segmentInfos.getVersion()) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
@@ -4977,9 +5008,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     try {
       purge(forcePurge);
     } finally {
-      if (applyAllDeletesAndUpdates()) {
-        maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
-      }
       flushCount.incrementAndGet();
     }
   }
@@ -5017,20 +5045,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     }
   }
   
-  private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
-    return processEvents(eventQueue, triggerMerge, forcePurge);
+  private void processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
+    processEvents(eventQueue, triggerMerge, forcePurge);
+    if (triggerMerge) {
+      maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+    }
   }
   
-  private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
+  private void processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
     boolean processed = false;
     if (tragedy == null) {
       Event event;
-      while((event = queue.poll()) != null)  {
+      while ((event = queue.poll()) != null)  {
         processed = true;
         event.process(this, triggerMerge, forcePurge);
       }
     }
-    return processed;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 0fdbc3e..1377a95 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -91,8 +91,12 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
    */
   public final static double DEFAULT_RAM_BUFFER_SIZE_MB = 16.0;
 
-  /** Default setting for {@link #setReaderPooling}. */
-  public final static boolean DEFAULT_READER_POOLING = false;
+  /** Default setting (true) for {@link #setReaderPooling}. */
+  // We changed this default to true with concurrent deletes/updates (LUCENE-7868),
+  // because we will otherwise need to open and close segment readers more frequently.
+  // False is still supported, but will have worse performance since readers will
+  // be forced to aggressively move all state to disk.
+  public final static boolean DEFAULT_READER_POOLING = true;
 
   /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
   public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
@@ -323,7 +327,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
    * Expert: Controls when segments are flushed to disk during indexing.
    * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
    * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
-   * @see #setMaxBufferedDeleteTerms(int)
    * @see #setMaxBufferedDocs(int)
    * @see #setRAMBufferSizeMB(double)
    */
@@ -375,11 +378,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   }
   
   @Override
-  public int getMaxBufferedDeleteTerms() {
-    return super.getMaxBufferedDeleteTerms();
-  }
-  
-  @Override
   public int getMaxBufferedDocs() {
     return super.getMaxBufferedDocs();
   }
@@ -425,11 +423,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
   }
   
   @Override
-  public IndexWriterConfig setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
-    return (IndexWriterConfig) super.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);
-  }
-  
-  @Override
   public IndexWriterConfig setMaxBufferedDocs(int maxBufferedDocs) {
     return (IndexWriterConfig) super.setMaxBufferedDocs(maxBufferedDocs);
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index d9e1bc7..cff1074 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -42,7 +42,6 @@ public class LiveIndexWriterConfig {
   
   private volatile int maxBufferedDocs;
   private volatile double ramBufferSizeMB;
-  private volatile int maxBufferedDeleteTerms;
   private volatile IndexReaderWarmer mergedSegmentWarmer;
 
   // modified by IndexWriterConfig
@@ -109,7 +108,6 @@ public class LiveIndexWriterConfig {
     this.analyzer = analyzer;
     ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;
     maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
-    maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
     mergedSegmentWarmer = null;
     delPolicy = new KeepOnlyLastCommitDeletionPolicy();
     commit = null;
@@ -136,43 +134,6 @@ public class LiveIndexWriterConfig {
   }
 
   /**
-   * Determines the maximum number of delete-by-term operations that will be
-   * buffered before both the buffered in-memory delete terms and queries are
-   * applied and flushed.
-   * <p>
-   * Disabled by default (writer flushes by RAM usage).
-   * <p>
-   * NOTE: This setting won't trigger a segment flush.
-   * 
-   * <p>
-   * Takes effect immediately, but only the next time a document is added,
-   * updated or deleted. Also, if you only delete-by-query, this setting has no
-   * effect, i.e. delete queries are buffered until the next segment is flushed.
-   * 
-   * @throws IllegalArgumentException
-   *           if maxBufferedDeleteTerms is enabled but smaller than 1
-   * 
-   * @see #setRAMBufferSizeMB
-   */
-  public LiveIndexWriterConfig setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
-    if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && maxBufferedDeleteTerms < 1) {
-      throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1 when enabled");
-    }
-    this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
-    return this;
-  }
-
-  /**
-   * Returns the number of buffered deleted terms that will trigger a flush of all
-   * buffered deletes if enabled.
-   *
-   * @see #setMaxBufferedDeleteTerms(int)
-   */
-  public int getMaxBufferedDeleteTerms() {
-    return maxBufferedDeleteTerms;
-  }
-  
-  /**
    * Determines the amount of RAM that may be used for buffering added documents
    * and deletions before they are flushed to the Directory. Generally for
    * faster indexing performance it's best to flush by RAM usage instead of
@@ -195,12 +156,8 @@ public class LiveIndexWriterConfig {
    * <b>NOTE</b>: the account of RAM usage for pending deletions is only
    * approximate. Specifically, if you delete by Query, Lucene currently has no
    * way to measure the RAM usage of individual Queries so the accounting will
-   * under-estimate and you should compensate by either calling commit()
-   * periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
-   * to flush and apply buffered deletes by count instead of RAM usage (for each
-   * buffered delete Query a constant number of bytes is used to estimate RAM
-   * usage). Note that enabling {@link #setMaxBufferedDeleteTerms(int)} will not
-   * trigger any segment flushes.
+   * under-estimate and you should compensate by either calling commit() or refresh()
+   * periodically yourself.
    * <p>
    * <b>NOTE</b>: It's not guaranteed that all memory resident documents are
    * flushed once this limit is exceeded. Depending on the configured
@@ -476,7 +433,6 @@ public class LiveIndexWriterConfig {
     sb.append("analyzer=").append(analyzer == null ? "null" : analyzer.getClass().getName()).append("\n");
     sb.append("ramBufferSizeMB=").append(getRAMBufferSizeMB()).append("\n");
     sb.append("maxBufferedDocs=").append(getMaxBufferedDocs()).append("\n");
-    sb.append("maxBufferedDeleteTerms=").append(getMaxBufferedDeleteTerms()).append("\n");
     sb.append("mergedSegmentWarmer=").append(getMergedSegmentWarmer()).append("\n");
     sb.append("delPolicy=").append(getIndexDeletionPolicy().getClass().getName()).append("\n");
     IndexCommit commit = getIndexCommit();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java b/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
deleted file mode 100644
index cd14eec..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-import java.util.List;
-
-import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.PriorityQueue;
-
-/** Merges multiple {@link FieldTermIterator}s */
-class MergedPrefixCodedTermsIterator extends FieldTermIterator {
-
-  private static class TermMergeQueue extends PriorityQueue<TermIterator> {
-    TermMergeQueue(int size) {
-      super(size);
-    }
-    
-    @Override
-    protected boolean lessThan(TermIterator a, TermIterator b) {
-      int cmp = a.bytes.compareTo(b.bytes);
-      if (cmp < 0) {
-        return true;
-      } else if (cmp > 0) {
-        return false;
-      } else {
-        return a.delGen() > b.delGen();
-      }
-    }
-  }
-
-  private static class FieldMergeQueue extends PriorityQueue<TermIterator> {
-    FieldMergeQueue(int size) {
-      super(size);
-    }
-    
-    @Override
-    protected boolean lessThan(TermIterator a, TermIterator b) {
-      return a.field.compareTo(b.field) < 0;
-    }
-  }
-
-  final TermMergeQueue termQueue;
-  final FieldMergeQueue fieldQueue;
-
-  public MergedPrefixCodedTermsIterator(List<PrefixCodedTerms> termsList) {
-    fieldQueue = new FieldMergeQueue(termsList.size());
-    for (PrefixCodedTerms terms : termsList) {
-      TermIterator iter = terms.iterator();
-      iter.next();
-      if (iter.field != null) {
-        fieldQueue.add(iter);
-      }
-    }
-
-    termQueue = new TermMergeQueue(termsList.size());
-  }
-
-  String field;
-
-  @Override
-  public BytesRef next() {
-    if (termQueue.size() == 0) {
-      // No more terms in current field:
-      if (fieldQueue.size() == 0) {
-        // No more fields:
-        field = null;
-        return null;
-      }
-
-      // Transfer all iterators on the next field into the term queue:
-      TermIterator top = fieldQueue.pop();
-      termQueue.add(top);
-      field = top.field;
-      assert field != null;
-
-      while (fieldQueue.size() != 0 && fieldQueue.top().field.equals(top.field)) {
-        TermIterator iter = fieldQueue.pop();
-        assert iter.field.equals(field);
-        // TODO: a little bit evil; we do this so we can == on field down below:
-        iter.field = field;
-        termQueue.add(iter);
-      }
-
-      return termQueue.top().bytes;
-    } else {
-      TermIterator top = termQueue.top();
-      if (top.next() == null) {
-        termQueue.pop();
-      } else if (top.field() != field) {
-        // Field changed
-        termQueue.pop();
-        fieldQueue.add(top);
-      } else {
-        termQueue.updateTop();
-      }
-      if (termQueue.size() == 0) {
-        // Recurse (just once) to go to next field:                                                                                                                                        
-        return next();
-      } else {
-        // Still terms left in this field
-        return termQueue.top().bytes;
-      }
-    }
-  }
-
-  @Override
-  public String field() {
-    return field;
-  }
-
-  @Override
-  public long delGen() {
-    return termQueue.top().delGen();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
index 4dd3cd0..a42754d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.util.InPlaceMergeSorter;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.packed.PackedInts;
 import org.apache.lucene.util.packed.PagedGrowableWriter;
 import org.apache.lucene.util.packed.PagedMutable;
@@ -40,11 +41,13 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
     private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
     private int doc = -1;
     private Long value = null;
+    private final long delGen;
     
-    Iterator(int size, PagedGrowableWriter values, PagedMutable docs) {
+    Iterator(int size, PagedGrowableWriter values, PagedMutable docs, long delGen) {
       this.size = size;
       this.values = values;
       this.docs = docs;
+      this.delGen = delGen;
     }
     
     @Override
@@ -61,6 +64,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
       doc = (int) docs.get(idx);
       ++idx;
       while (idx < size && docs.get(idx) == doc) {
+        // scan forward to last update to this doc
         ++idx;
       }
       // idx points to the "next" element
@@ -72,12 +76,10 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
     int doc() {
       return doc;
     }
-    
+
     @Override
-    void reset() {
-      doc = -1;
-      value = null;
-      idx = 0;
+    long delGen() {
+      return delGen;
     }
   }
 
@@ -86,16 +88,26 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
   private PagedGrowableWriter values;
   private int size;
   
-  public NumericDocValuesFieldUpdates(String field, int maxDoc) {
-    super(field, DocValuesType.NUMERIC);
+  public NumericDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
+    super(maxDoc, delGen, field, DocValuesType.NUMERIC);
     bitsPerValue = PackedInts.bitsRequired(maxDoc - 1);
     docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT);
     values = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
-    size = 0;
   }
-  
+
+  @Override
+  public int size() {
+    return size;
+  }
+
   @Override
-  public void add(int doc, Object value) {
+  public synchronized void add(int doc, Object value) {
+    if (finished) {
+      throw new IllegalStateException("already finished");
+    }
+
+    assert doc < maxDoc;
+    
     // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
     if (size == Integer.MAX_VALUE) {
       throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
@@ -113,11 +125,20 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
     values.set(size, val.longValue());
     ++size;
   }
-  
+
   @Override
-  public Iterator iterator() {
-    final PagedMutable docs = this.docs;
-    final PagedGrowableWriter values = this.values;
+  public void finish() {
+    if (finished) {
+      throw new IllegalStateException("already finished");
+    }
+    finished = true;
+
+    // shrink wrap
+    if (size < docs.size()) {
+      docs = docs.resize(size);
+      values = values.resize(size);
+    }
+
     new InPlaceMergeSorter() {
       @Override
       protected void swap(int i, int j) {
@@ -129,48 +150,36 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
         values.set(j, values.get(i));
         values.set(i, tmpVal);
       }
-      
+
       @Override
       protected int compare(int i, int j) {
-        int x = (int) docs.get(i);
-        int y = (int) docs.get(j);
-        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+        // increasing docID order:
+        // NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being
+        // stable and preserving original order so the last update to that docID wins
+        return Integer.compare((int) docs.get(i), (int) docs.get(j));
       }
     }.sort(0, size);
-    
-    return new Iterator(size, values, docs);
   }
-  
+
   @Override
-  public void merge(DocValuesFieldUpdates other) {
-    assert other instanceof NumericDocValuesFieldUpdates;
-    NumericDocValuesFieldUpdates otherUpdates = (NumericDocValuesFieldUpdates) other;
-    if (otherUpdates.size > Integer.MAX_VALUE - size) {
-      throw new IllegalStateException(
-          "cannot support more than Integer.MAX_VALUE doc/value entries; size="
-              + size + " other.size=" + otherUpdates.size);
-    }
-    docs = docs.grow(size + otherUpdates.size);
-    values = values.grow(size + otherUpdates.size);
-    for (int i = 0; i < otherUpdates.size; i++) {
-      int doc = (int) otherUpdates.docs.get(i);
-      docs.set(size, doc);
-      values.set(size, otherUpdates.values.get(i));
-      ++size;
+  public Iterator iterator() {
+    if (finished == false) {
+      throw new IllegalStateException("call finish first");
     }
+    return new Iterator(size, values, docs, delGen);
   }
-
+  
   @Override
   public boolean any() {
     return size > 0;
   }
 
   @Override
-  public long ramBytesPerDoc() {
-    long bytesPerDoc = (long) Math.ceil((double) (bitsPerValue) / 8);
-    final int capacity = estimateCapacity(size);
-    bytesPerDoc += (long) Math.ceil((double) values.ramBytesUsed() / capacity); // values
-    return bytesPerDoc;
+  public long ramBytesUsed() {
+    return values.ramBytesUsed()
+      + docs.ramBytesUsed()
+      + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+      + 2 * RamUsageEstimator.NUM_BYTES_INT
+      + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java b/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
index df1653b..ba56c2a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
+++ b/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
@@ -129,7 +129,7 @@ public class PrefixCodedTerms implements Accountable {
 
     private TermIterator(long delGen, RAMFile buffer) {
       try {
-        input = new RAMInputStream("MergedPrefixCodedTermsIterator", buffer);
+        input = new RAMInputStream("PrefixCodedTermsIterator", buffer);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }