You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/25 02:06:33 UTC

[11/47] lucene-solr:feature/autoscaling: LUCENE-7868: use multiple threads to concurrently resolve deletes and DV udpates

LUCENE-7868: use multiple threads to concurrently resolve deletes and DV udpates


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/58105a20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/58105a20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/58105a20

Branch: refs/heads/feature/autoscaling
Commit: 58105a203a19d18a56e09cf69dc0083c1b890315
Parents: 1737fce
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 21 13:47:15 2017 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jun 21 13:47:15 2017 -0400

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   7 +
 .../index/BinaryDocValuesFieldUpdates.java      | 101 +--
 .../apache/lucene/index/BufferedUpdates.java    |  53 +-
 .../lucene/index/BufferedUpdatesStream.java     | 698 ++++-----------
 .../apache/lucene/index/CoalescedUpdates.java   | 109 ---
 .../lucene/index/DocValuesFieldUpdates.java     | 190 +++--
 .../apache/lucene/index/DocValuesUpdate.java    |   3 -
 .../apache/lucene/index/DocumentsWriter.java    |  66 +-
 .../index/DocumentsWriterDeleteQueue.java       |  29 +-
 .../index/DocumentsWriterFlushControl.java      |  21 +-
 .../lucene/index/DocumentsWriterFlushQueue.java |  33 +-
 .../lucene/index/DocumentsWriterPerThread.java  |  35 +-
 .../index/DocumentsWriterPerThreadPool.java     |  11 +-
 .../lucene/index/FlushByRamOrCountsPolicy.java  |  29 +-
 .../org/apache/lucene/index/FlushPolicy.java    |   3 -
 .../lucene/index/FreqProxTermsWriter.java       |   5 +-
 .../lucene/index/FrozenBufferedUpdates.java     | 841 +++++++++++++++++--
 .../apache/lucene/index/IndexFileDeleter.java   |   7 +-
 .../org/apache/lucene/index/IndexWriter.java    | 776 +++++++++--------
 .../apache/lucene/index/IndexWriterConfig.java  |  19 +-
 .../lucene/index/LiveIndexWriterConfig.java     |  48 +-
 .../index/MergedPrefixCodedTermsIterator.java   | 132 ---
 .../index/NumericDocValuesFieldUpdates.java     |  97 ++-
 .../apache/lucene/index/PrefixCodedTerms.java   |   2 +-
 .../apache/lucene/index/ReadersAndUpdates.java  | 413 ++++++---
 .../apache/lucene/index/SegmentCommitInfo.java  |  16 +-
 .../apache/lucene/index/SegmentCoreReaders.java |   2 -
 .../org/apache/lucene/index/SegmentInfo.java    |   2 +-
 .../org/apache/lucene/index/SegmentReader.java  |  19 +-
 .../lucene/index/SerialMergeScheduler.java      |   4 +-
 .../apache/lucene/index/TieredMergePolicy.java  |  73 +-
 .../util/packed/AbstractPagedMutable.java       |   2 +-
 .../index/TestBinaryDocValuesUpdates.java       | 146 +++-
 .../index/TestDocumentsWriterDeleteQueue.java   |  39 +-
 .../index/TestFlushByRamOrCountsPolicy.java     |  12 +-
 .../lucene/index/TestForceMergeForever.java     |   7 +-
 .../apache/lucene/index/TestIndexWriter.java    |  61 --
 .../lucene/index/TestIndexWriterConfig.java     |   7 +-
 .../lucene/index/TestIndexWriterDelete.java     | 153 ++--
 .../lucene/index/TestIndexWriterExceptions.java | 253 ------
 .../lucene/index/TestIndexWriterReader.java     |   1 +
 .../lucene/index/TestNRTReaderWithThreads.java  |   8 +-
 .../index/TestNumericDocValuesUpdates.java      | 314 +++++--
 .../lucene/index/TestPerSegmentDeletes.java     |   7 +-
 .../lucene/index/TestPrefixCodedTerms.java      |  76 --
 .../TestControlledRealTimeReopenThread.java     |   4 +-
 .../apache/lucene/search/join/TestJoinUtil.java |  16 +-
 .../idversion/IDVersionPostingsWriter.java      |   4 +-
 .../idversion/VersionBlockTreeTermsWriter.java  |   4 +-
 .../index/BaseDocValuesFormatTestCase.java      |  24 +
 .../index/BaseIndexFileFormatTestCase.java      |   2 +-
 .../org/apache/lucene/util/LuceneTestCase.java  |  11 -
 52 files changed, 2616 insertions(+), 2379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index d695310..56da726 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -24,6 +24,10 @@ New Features
   the term frequency to this value.  (Uwe Schindler, Robert Muir, Mike
   McCandless)
 
+* LUCENE-7868: Multiple threads can now resolve deletes and doc values
+  updates concurrently, giving sizable speedups in update-heavy
+  indexing use cases (Simon Willnauer, Mike McCandless)
+
 API Changes
 
 * LUCENE-2605: Classic QueryParser no longer splits on whitespace by default.
@@ -76,6 +80,9 @@ API Changes
 
 * LUCENE-7872: TopDocs.totalHits is now a long. (Adrien Grand, hossman)
 
+* LUCENE-7868: IndexWriterConfig.setMaxBufferedDeleteTerms is
+  removed. (Simon Willnauer, Mike McCandless)
+
 Bug Fixes
 
 * LUCENE-7626: IndexWriter will no longer accept broken token offsets

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
index f8cece9..e2700ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
@@ -22,6 +22,7 @@ import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.lucene.util.InPlaceMergeSorter;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.packed.PackedInts;
 import org.apache.lucene.util.packed.PagedGrowableWriter;
 import org.apache.lucene.util.packed.PagedMutable;
@@ -35,22 +36,24 @@ import org.apache.lucene.util.packed.PagedMutable;
 class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
   
   final static class Iterator extends DocValuesFieldUpdates.Iterator {
-    private final PagedGrowableWriter offsets;
     private final int size;
+    private final PagedGrowableWriter offsets;
     private final PagedGrowableWriter lengths;
     private final PagedMutable docs;
     private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
     private int doc = -1;
     private final BytesRef value;
     private int offset, length;
+    private final long delGen;
     
     Iterator(int size, PagedGrowableWriter offsets, PagedGrowableWriter lengths, 
-        PagedMutable docs, BytesRef values) {
+             PagedMutable docs, BytesRef values, long delGen) {
       this.offsets = offsets;
       this.size = size;
       this.lengths = lengths;
       this.docs = docs;
       value = values.clone();
+      this.delGen = delGen;
     }
     
     @Override
@@ -69,6 +72,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
       doc = (int) docs.get(idx);
       ++idx;
       while (idx < size && docs.get(idx) == doc) {
+        // scan forward to last update to this doc
         ++idx;
       }
       // idx points to the "next" element
@@ -87,10 +91,8 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
     }
     
     @Override
-    void reset() {
-      doc = -1;
-      offset = -1;
-      idx = 0;
+    long delGen() {
+      return delGen;
     }
   }
 
@@ -100,18 +102,29 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
   private int size;
   private final int bitsPerValue;
   
-  public BinaryDocValuesFieldUpdates(String field, int maxDoc) {
-    super(field, DocValuesType.BINARY);
+  public BinaryDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
+    super(maxDoc, delGen, field, DocValuesType.BINARY);
     bitsPerValue = PackedInts.bitsRequired(maxDoc - 1);
     docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT);
     offsets = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
     lengths = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
     values = new BytesRefBuilder();
-    size = 0;
   }
-  
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  // NOTE: we fully consume the incoming BytesRef so caller is free to reuse it after we return:
   @Override
-  public void add(int doc, Object value) {
+  synchronized public void add(int doc, Object value) {
+    if (finished) {
+      throw new IllegalStateException("already finished");
+    }
+
+    assert doc < maxDoc: "doc=" + doc + " maxDoc=" + maxDoc;
+
     // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
     if (size == Integer.MAX_VALUE) {
       throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
@@ -134,11 +147,19 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
   }
 
   @Override
-  public Iterator iterator() {
-    final PagedMutable docs = this.docs;
-    final PagedGrowableWriter offsets = this.offsets;
-    final PagedGrowableWriter lengths = this.lengths;
-    final BytesRef values = this.values.get();
+  public void finish() {
+    if (finished) {
+      throw new IllegalStateException("already finished");
+    }
+    finished = true;
+
+    // shrink wrap
+    if (size < docs.size()) {
+      docs = docs.resize(size);
+      offsets = offsets.resize(size);
+      lengths = lengths.resize(size);
+    }
+
     new InPlaceMergeSorter() {
       @Override
       protected void swap(int i, int j) {
@@ -157,36 +178,20 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
       
       @Override
       protected int compare(int i, int j) {
-        int x = (int) docs.get(i);
-        int y = (int) docs.get(j);
-        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+        // increasing docID order:
+        // NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being
+        // stable and preserving original order so the last update to that docID wins
+        return Integer.compare((int) docs.get(i), (int) docs.get(j));
       }
     }.sort(0, size);
-    
-    return new Iterator(size, offsets, lengths, docs, values);
   }
 
   @Override
-  public void merge(DocValuesFieldUpdates other) {
-    BinaryDocValuesFieldUpdates otherUpdates = (BinaryDocValuesFieldUpdates) other;
-    if (otherUpdates.size > Integer.MAX_VALUE - size) {
-      throw new IllegalStateException(
-          "cannot support more than Integer.MAX_VALUE doc/value entries; size="
-              + size + " other.size=" + otherUpdates.size);
-    }
-    final int newSize = size  + otherUpdates.size;
-    docs = docs.grow(newSize);
-    offsets = offsets.grow(newSize);
-    lengths = lengths.grow(newSize);
-    for (int i = 0; i < otherUpdates.size; i++) {
-      int doc = (int) otherUpdates.docs.get(i);
-      docs.set(size, doc);
-      offsets.set(size, values.length() + otherUpdates.offsets.get(i)); // correct relative offset
-      lengths.set(size, otherUpdates.lengths.get(i));
-      ++size;
+  public Iterator iterator() {
+    if (finished == false) {
+      throw new IllegalStateException("call finish first");
     }
-
-    values.append(otherUpdates.values);
+    return new Iterator(size, offsets, lengths, docs, values.get(), delGen);
   }
 
   @Override
@@ -195,13 +200,13 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
   }
 
   @Override
-  public long ramBytesPerDoc() {
-    long bytesPerDoc = (long) Math.ceil((double) (bitsPerValue) / 8); // docs
-    final int capacity = estimateCapacity(size);
-    bytesPerDoc += (long) Math.ceil((double) offsets.ramBytesUsed() / capacity); // offsets
-    bytesPerDoc += (long) Math.ceil((double) lengths.ramBytesUsed() / capacity); // lengths
-    bytesPerDoc += (long) Math.ceil((double) values.length() / size); // values
-    return bytesPerDoc;
+  public long ramBytesUsed() {
+    return offsets.ramBytesUsed()
+      + lengths.ramBytesUsed()
+      + docs.ramBytesUsed()
+      + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+      + 4 * RamUsageEstimator.NUM_BYTES_INT
+      + 5 * RamUsageEstimator.NUM_BYTES_OBJECT_REF
+      + values.bytes().length;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
index 1c3494f..a5a86e6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
@@ -30,11 +30,12 @@ import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.RamUsageEstimator;
 
-/* Holds buffered deletes and updates, by docID, term or query for a
- * single segment. This is used to hold buffered pending
- * deletes and updates against the to-be-flushed segment.  Once the
- * deletes and updates are pushed (on flush in DocumentsWriter), they
- * are converted to a FrozenBufferedUpdates instance. */
+/** Holds buffered deletes and updates, by docID, term or query for a
+ *  single segment. This is used to hold buffered pending
+ *  deletes and updates against the to-be-flushed segment.  Once the
+ *  deletes and updates are pushed (on flush in DocumentsWriter), they
+ *  are converted to a {@link FrozenBufferedUpdates} instance and
+ *  pushed to the {@link BufferedUpdatesStream}. */
 
 // NOTE: instances of this class are accessed either via a private
 // instance on DocumentWriterPerThread, or via sync'd code by
@@ -128,10 +129,9 @@ class BufferedUpdates {
   final AtomicInteger numNumericUpdates = new AtomicInteger();
   final AtomicInteger numBinaryUpdates = new AtomicInteger();
 
-  // TODO: rename thes three: put "deleted" prefix in front:
-  final Map<Term,Integer> terms = new HashMap<>();
-  final Map<Query,Integer> queries = new HashMap<>();
-  final List<Integer> docIDs = new ArrayList<>();
+  final Map<Term,Integer> deleteTerms = new HashMap<>();
+  final Map<Query,Integer> deleteQueries = new HashMap<>();
+  final List<Integer> deleteDocIDs = new ArrayList<>();
 
   // Map<dvField,Map<updateTerm,NumericUpdate>>
   // For each field we keep an ordered list of NumericUpdates, key'd by the
@@ -169,19 +169,19 @@ class BufferedUpdates {
   @Override
   public String toString() {
     if (VERBOSE_DELETES) {
-      return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms
-        + ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates
+      return "gen=" + gen + " numTerms=" + numTermDeletes + ", deleteTerms=" + deleteTerms
+        + ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", numericUpdates=" + numericUpdates
         + ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed;
     } else {
       String s = "gen=" + gen;
       if (numTermDeletes.get() != 0) {
-        s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
+        s += " " + numTermDeletes.get() + " deleted terms (unique count=" + deleteTerms.size() + ")";
       }
-      if (queries.size() != 0) {
-        s += " " + queries.size() + " deleted queries";
+      if (deleteQueries.size() != 0) {
+        s += " " + deleteQueries.size() + " deleted queries";
       }
-      if (docIDs.size() != 0) {
-        s += " " + docIDs.size() + " deleted docIDs";
+      if (deleteDocIDs.size() != 0) {
+        s += " " + deleteDocIDs.size() + " deleted docIDs";
       }
       if (numNumericUpdates.get() != 0) {
         s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")";
@@ -198,7 +198,7 @@ class BufferedUpdates {
   }
 
   public void addQuery(Query query, int docIDUpto) {
-    Integer current = queries.put(query, docIDUpto);
+    Integer current = deleteQueries.put(query, docIDUpto);
     // increment bytes used only if the query wasn't added so far.
     if (current == null) {
       bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
@@ -206,12 +206,12 @@ class BufferedUpdates {
   }
 
   public void addDocID(int docID) {
-    docIDs.add(Integer.valueOf(docID));
+    deleteDocIDs.add(Integer.valueOf(docID));
     bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
   }
 
   public void addTerm(Term term, int docIDUpto) {
-    Integer current = terms.get(term);
+    Integer current = deleteTerms.get(term);
     if (current != null && docIDUpto < current) {
       // Only record the new number if it's greater than the
       // current one.  This is important because if multiple
@@ -223,7 +223,7 @@ class BufferedUpdates {
       return;
     }
 
-    terms.put(term, Integer.valueOf(docIDUpto));
+    deleteTerms.put(term, Integer.valueOf(docIDUpto));
     // note that if current != null then it means there's already a buffered
     // delete on that term, therefore we seem to over-count. this over-counting
     // is done to respect IndexWriterConfig.setMaxBufferedDeleteTerms.
@@ -290,11 +290,16 @@ class BufferedUpdates {
       bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes());
     }
   }
+
+  void clearDeleteTerms() {
+    deleteTerms.clear();
+    numTermDeletes.set(0);
+  }
   
   void clear() {
-    terms.clear();
-    queries.clear();
-    docIDs.clear();
+    deleteTerms.clear();
+    deleteQueries.clear();
+    deleteDocIDs.clear();
     numericUpdates.clear();
     binaryUpdates.clear();
     numTermDeletes.set(0);
@@ -304,6 +309,6 @@ class BufferedUpdates {
   }
   
   boolean any() {
-    return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
+    return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
index 9955626..8c0a2be 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
@@ -14,40 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.lucene.index;
 
+package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.util.Accountable;
-import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.PriorityQueue;
 
-/* Tracks the stream of {@link BufferedDeletes}.
+/** Tracks the stream of {@link FrozenBufferedUpdates}.
  * When DocumentsWriterPerThread flushes, its buffered
- * deletes and updates are appended to this stream.  We later
- * apply them (resolve them to the actual
- * docIDs, per segment) when a merge is started
- * (only to the to-be-merged segments).  We
- * also apply to all segments when NRT reader is pulled,
- * commit/close is called, or when too many deletes or  updates are
+ * deletes and updates are appended to this stream and immediately
+ * resolved (to actual docIDs, per segment) using the indexing
+ * thread that triggered the flush for concurrency.  When a
+ * merge kicks off, we sync to ensure all resolving packets
+ * complete.  We also apply to all segments when NRT reader is pulled,
+ * commit/close is called, or when too many deletes or updates are
  * buffered and must be flushed (by RAM usage or by count).
  *
  * Each packet is assigned a generation, and each flushed or
@@ -57,23 +50,24 @@ import org.apache.lucene.util.PriorityQueue;
 
 class BufferedUpdatesStream implements Accountable {
 
-  // TODO: maybe linked list?
-  private final List<FrozenBufferedUpdates> updates = new ArrayList<>();
+  private final Set<FrozenBufferedUpdates> updates = new HashSet<>();
 
   // Starts at 1 so that SegmentInfos that have never had
   // deletes applied (whose bufferedDelGen defaults to 0)
   // will be correct:
   private long nextGen = 1;
 
-  // used only by assert
-  private BytesRef lastDeleteTerm;
-
+  private final FinishedSegments finishedSegments;
   private final InfoStream infoStream;
   private final AtomicLong bytesUsed = new AtomicLong();
   private final AtomicInteger numTerms = new AtomicInteger();
+  private final IndexWriter writer;
+  private boolean closed;
 
-  public BufferedUpdatesStream(InfoStream infoStream) {
-    this.infoStream = infoStream;
+  public BufferedUpdatesStream(IndexWriter writer) {
+    this.writer = writer;
+    this.infoStream = writer.infoStream;
+    this.finishedSegments = new FinishedSegments(infoStream);
   }
 
   // Appends a new packet of buffered deletes to the stream,
@@ -89,21 +83,27 @@ class BufferedUpdatesStream implements Accountable {
     packet.setDelGen(nextGen++);
     assert packet.any();
     assert checkDeleteStats();
-    assert packet.delGen() < nextGen;
-    assert updates.isEmpty() || updates.get(updates.size()-1).delGen() < packet.delGen() : "Delete packets must be in order";
+
     updates.add(packet);
     numTerms.addAndGet(packet.numTermDeletes);
     bytesUsed.addAndGet(packet.bytesUsed);
     if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD", "push deletes " + packet + " segmentPrivate?=" + packet.isSegmentPrivate + " delGen=" + packet.delGen() + " packetCount=" + updates.size() + " totBytesUsed=" + bytesUsed.get());
+      infoStream.message("BD", String.format(Locale.ROOT, "push new packet (%s), packetCount=%d, bytesUsed=%.3f MB", packet, updates.size(), bytesUsed.get()/1024./1024.));
     }
     assert checkDeleteStats();
+
     return packet.delGen();
   }
 
+  public synchronized int getPendingUpdatesCount() {
+    return updates.size();
+  }
+
+  /** Only used by IW.rollback */
   public synchronized void clear() {
     updates.clear();
     nextGen = 1;
+    finishedSegments.clear();
     numTerms.set(0);
     bytesUsed.set(0);
   }
@@ -121,253 +121,148 @@ class BufferedUpdatesStream implements Accountable {
     return bytesUsed.get();
   }
 
+  private synchronized void ensureOpen() {
+    if (closed) {
+      throw new AlreadyClosedException("already closed");
+    }
+  }
+
   public static class ApplyDeletesResult {
     
     // True if any actual deletes took place:
     public final boolean anyDeletes;
 
-    // Current gen, for the merged segment:
-    public final long gen;
-
     // If non-null, contains segments that are 100% deleted
     public final List<SegmentCommitInfo> allDeleted;
 
-    ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentCommitInfo> allDeleted) {
+    ApplyDeletesResult(boolean anyDeletes, List<SegmentCommitInfo> allDeleted) {
       this.anyDeletes = anyDeletes;
-      this.gen = gen;
       this.allDeleted = allDeleted;
     }
   }
 
-  // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
-  private static final Comparator<SegmentCommitInfo> sortSegInfoByDelGen = new Comparator<SegmentCommitInfo>() {
-    @Override
-    public int compare(SegmentCommitInfo si1, SegmentCommitInfo si2) {
-      return Long.compare(si1.getBufferedDeletesGen(), si2.getBufferedDeletesGen());
-    }
-  };
-  
-  /** Resolves the buffered deleted Term/Query/docIDs, into
-   *  actual deleted docIDs in the liveDocs MutableBits for
-   *  each SegmentReader. */
-  public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
-    final long t0 = System.currentTimeMillis();
+  /** Waits for all in-flight packets, which are already being resolved concurrently
+   *  by indexing threads, to finish.  Returns true if there were any 
+   *  new deletes or updates.  This is called for refresh, commit. */
+  public void waitApplyAll() throws IOException {
 
-    final long gen = nextGen++;
+    assert Thread.holdsLock(writer) == false;
+    
+    final long t0 = System.nanoTime();
 
-    if (infos.size() == 0) {
-      return new ApplyDeletesResult(false, gen, null);
+    Set<FrozenBufferedUpdates> waitFor;
+    synchronized (this) {
+      waitFor = new HashSet<>(updates);
     }
 
-    // We only init these on demand, when we find our first deletes that need to be applied:
-    SegmentState[] segStates = null;
-
-    long totDelCount = 0;
-    long totTermVisitedCount = 0;
-
-    boolean success = false;
-
-    ApplyDeletesResult result = null;
-
-    try {
-      if (infoStream.isEnabled("BD")) {
-        infoStream.message("BD", String.format(Locale.ROOT, "applyDeletes: open segment readers took %d msec", System.currentTimeMillis()-t0));
-      }
-
-      assert checkDeleteStats();
-
-      if (!any()) {
-        if (infoStream.isEnabled("BD")) {
-          infoStream.message("BD", "applyDeletes: no segments; skipping");
-        }
-        return new ApplyDeletesResult(false, gen, null);
-      }
-
-      if (infoStream.isEnabled("BD")) {
-        infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + updates.size());
-      }
-
-      infos = sortByDelGen(infos);
-
-      CoalescedUpdates coalescedUpdates = null;
-      int infosIDX = infos.size()-1;
-      int delIDX = updates.size()-1;
-
-      // Backwards merge sort the segment delGens with the packet delGens in the buffered stream:
-      while (infosIDX >= 0) {
-        final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
-        final SegmentCommitInfo info = infos.get(infosIDX);
-        final long segGen = info.getBufferedDeletesGen();
-
-        if (packet != null && segGen < packet.delGen()) {
-          if (!packet.isSegmentPrivate && packet.any()) {
-            /*
-             * Only coalesce if we are NOT on a segment private del packet: the segment private del packet
-             * must only apply to segments with the same delGen.  Yet, if a segment is already deleted
-             * from the SI since it had no more documents remaining after some del packets younger than
-             * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
-             * removed.
-             */
-            if (coalescedUpdates == null) {
-              coalescedUpdates = new CoalescedUpdates();
-            }
-            coalescedUpdates.update(packet);
-          }
-
-          delIDX--;
-        } else if (packet != null && segGen == packet.delGen()) {
-          assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen;
+    waitApply(waitFor);
+  }
 
-          if (segStates == null) {
-            segStates = openSegmentStates(pool, infos);
-          }
+  /** Returns true if this delGen is still running. */
+  public boolean stillRunning(long delGen) {
+    return finishedSegments.stillRunning(delGen);
+  }
 
-          SegmentState segState = segStates[infosIDX];
+  public void finishedSegment(long delGen) {
+    finishedSegments.finishedSegment(delGen);
+  }
+  
+  /** Called by indexing threads once they are fully done resolving all deletes for the provided
+   *  delGen.  We track the completed delGens and record the maximum delGen for which all prior
+   *  delGens, inclusive, are completed, so that it's safe for doc values updates to apply and write. */
 
-          // Lock order: IW -> BD -> RP
-          assert pool.infoIsLive(info);
-          int delCount = 0;
-          final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
+  public synchronized void finished(FrozenBufferedUpdates packet) {
+    // TODO: would be a bit more memory efficient to track this per-segment, so when each segment writes it writes all packets finished for
+    // it, rather than only recording here, across all segments.  But, more complex code, and more CPU, and maybe not so much impact in
+    // practice?
 
-          // first apply segment-private deletes/updates
-          delCount += applyQueryDeletes(packet.queriesIterable(), segState);
-          applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), segState, dvUpdates);
-          applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), segState, dvUpdates);
+    packet.applied.countDown();
 
-          // ... then coalesced deletes/updates, so that if there is an update that appears in both, the coalesced updates (carried from
-          // updates ahead of the segment-privates ones) win:
-          if (coalescedUpdates != null) {
-            delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
-            applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
-            applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
-          }
-          if (dvUpdates.any()) {
-            segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
-          }
+    updates.remove(packet);
+    numTerms.addAndGet(-packet.numTermDeletes);
+    bytesUsed.addAndGet(-packet.bytesUsed);
 
-          totDelCount += delCount;
+    finishedSegment(packet.delGen());
+  }
 
-          /*
-           * Since we are on a segment private del packet we must not
-           * update the coalescedUpdates here! We can simply advance to the 
-           * next packet and seginfo.
-           */
-          delIDX--;
-          infosIDX--;
+  /** All frozen packets up to and including this del gen are guaranteed to be finished. */
+  public long getCompletedDelGen() {
+    return finishedSegments.getCompletedDelGen();
+  }   
 
-        } else {
-          if (coalescedUpdates != null) {
-            if (segStates == null) {
-              segStates = openSegmentStates(pool, infos);
-            }
-            SegmentState segState = segStates[infosIDX];
-            // Lock order: IW -> BD -> RP
-            assert pool.infoIsLive(info);
-            int delCount = 0;
-            delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
-            DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
-            applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
-            applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
-            if (dvUpdates.any()) {
-              segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
-            }
+  /** Waits only for those in-flight packets that apply to these merge segments.  This is
+   *  called when a merge needs to finish and must ensure all deletes to the merging
+   *  segments are resolved. */
+  public void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos) throws IOException {
+    assert Thread.holdsLock(writer) == false;
 
-            totDelCount += delCount;
-          }
+    final long t0 = System.nanoTime();
 
-          infosIDX--;
-        }
-      }
+    long maxDelGen = Long.MIN_VALUE;
+    for (SegmentCommitInfo info : mergeInfos) {
+      maxDelGen = Math.max(maxDelGen, info.getBufferedDeletesGen());
+    }
 
-      // Now apply all term deletes:
-      if (coalescedUpdates != null && coalescedUpdates.totalTermCount != 0) {
-        if (segStates == null) {
-          segStates = openSegmentStates(pool, infos);
+    Set<FrozenBufferedUpdates> waitFor = new HashSet<>();
+    synchronized (this) {
+      for (FrozenBufferedUpdates packet : updates) {
+        if (packet.delGen() <= maxDelGen) {
+          // We must wait for this packet before finishing the merge because its
+          // deletes apply to a subset of the segments being merged:
+          waitFor.add(packet);
         }
-        totTermVisitedCount += applyTermDeletes(coalescedUpdates, segStates);
-      }
-
-      assert checkDeleteStats();
-
-      success = true;
-
-    } finally {
-      if (segStates != null) {
-        result = closeSegmentStates(pool, segStates, success, gen);
       }
     }
 
-    if (result == null) {
-      result = new ApplyDeletesResult(false, gen, null);      
-    }
-
     if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD",
-                         String.format(Locale.ROOT,
-                                       "applyDeletes took %d msec for %d segments, %d newly deleted docs (query deletes), %d visited terms, allDeleted=%s",
-                                       System.currentTimeMillis()-t0, infos.size(), totDelCount, totTermVisitedCount, result.allDeleted));
+      infoStream.message("BD", "waitApplyForMerge: " + waitFor.size() + " packets, " + mergeInfos.size() + " merging segments");
     }
-
-    return result;
+    
+    waitApply(waitFor);
   }
 
-  private List<SegmentCommitInfo> sortByDelGen(List<SegmentCommitInfo> infos) {
-    infos = new ArrayList<>(infos);
-    // Smaller delGens come first:
-    Collections.sort(infos, sortSegInfoByDelGen);
-    return infos;
-  }
+  private void waitApply(Set<FrozenBufferedUpdates> waitFor) throws IOException {
 
-  synchronized long getNextGen() {
-    return nextGen++;
-  }
+    long startNS = System.nanoTime();
 
-  // Lock order IW -> BD
-  /* Removes any BufferedDeletes that we no longer need to
-   * store because all segments in the index have had the
-   * deletes applied. */
-  public synchronized void prune(SegmentInfos segmentInfos) {
-    assert checkDeleteStats();
-    long minGen = Long.MAX_VALUE;
-    for(SegmentCommitInfo info : segmentInfos) {
-      minGen = Math.min(info.getBufferedDeletesGen(), minGen);
+    int packetCount = waitFor.size();
+
+    if (waitFor.isEmpty()) {
+      if (infoStream.isEnabled("BD")) {
+        infoStream.message("BD", "waitApply: no deletes to apply");
+      }
+      return;
     }
 
     if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD", "prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + updates.size());
+      infoStream.message("BD", "waitApply: " + waitFor.size() + " packets: " + waitFor);
     }
-    final int limit = updates.size();
-    for(int delIDX=0;delIDX<limit;delIDX++) {
-      if (updates.get(delIDX).delGen() >= minGen) {
-        prune(delIDX);
-        assert checkDeleteStats();
-        return;
-      }
+
+    long totalDelCount = 0;
+    for (FrozenBufferedUpdates packet : waitFor) {
+      // Frozen packets are now resolved, concurrently, by the indexing threads that
+      // create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
+      // but if we get here and the packet is not yet resolved, we resolve it now ourselves:
+      packet.apply(writer);
+      totalDelCount += packet.totalDelCount;
     }
 
-    // All deletes pruned
-    prune(limit);
-    assert !any();
-    assert checkDeleteStats();
+    if (infoStream.isEnabled("BD")) {
+      infoStream.message("BD",
+                         String.format(Locale.ROOT, "waitApply: done %d packets; totalDelCount=%d; totBytesUsed=%d; took %.2f msec",
+                                       packetCount,
+                                       totalDelCount,
+                                       bytesUsed.get(),
+                                       (System.nanoTime() - startNS) / 1000000.));
+    }
   }
 
-  private synchronized void prune(int count) {
-    if (count > 0) {
-      if (infoStream.isEnabled("BD")) {
-        infoStream.message("BD", "pruneDeletes: prune " + count + " packets; " + (updates.size() - count) + " packets remain");
-      }
-      for(int delIDX=0;delIDX<count;delIDX++) {
-        final FrozenBufferedUpdates packet = updates.get(delIDX);
-        numTerms.addAndGet(-packet.numTermDeletes);
-        assert numTerms.get() >= 0;
-        bytesUsed.addAndGet(-packet.bytesUsed);
-        assert bytesUsed.get() >= 0;
-      }
-      updates.subList(0, count).clear();
-    }
+  synchronized long getNextGen() {
+    return nextGen++;
   }
 
-  static class SegmentState {
+  /** Holds all per-segment internal state used while resolving deletions. */
+  public static final class SegmentState {
     final long delGen;
     final ReadersAndUpdates rld;
     final SegmentReader reader;
@@ -376,7 +271,6 @@ class BufferedUpdatesStream implements Accountable {
     TermsEnum termsEnum;
     PostingsEnum postingsEnum;
     BytesRef term;
-    boolean any;
 
     public SegmentState(IndexWriter.ReaderPool pool, SegmentCommitInfo info) throws IOException {
       rld = pool.get(info, true);
@@ -392,58 +286,54 @@ class BufferedUpdatesStream implements Accountable {
         pool.release(rld);
       }
     }
-  }
-
-  /** Does a merge sort by current term across all segments. */
-  static class SegmentQueue extends PriorityQueue<SegmentState> {
-    public SegmentQueue(int size) {
-      super(size);
-    }
 
     @Override
-    protected boolean lessThan(SegmentState a, SegmentState b) {
-      return a.term.compareTo(b.term) < 0;
+    public String toString() {
+      return "SegmentState(" + rld.info + ")";
     }
   }
 
   /** Opens SegmentReader and inits SegmentState for each segment. */
-  private SegmentState[] openSegmentStates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
-    int numReaders = infos.size();
-    SegmentState[] segStates = new SegmentState[numReaders];
+  public SegmentState[] openSegmentStates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos,
+                                          Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
+    ensureOpen();
+
+    List<SegmentState> segStates = new ArrayList<>();
     boolean success = false;
     try {
-      for(int i=0;i<numReaders;i++) {
-        segStates[i] = new SegmentState(pool, infos.get(i));
+      for (SegmentCommitInfo info : infos) {
+        if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
+          segStates.add(new SegmentState(pool, info));
+          alreadySeenSegments.add(info);
+        }
       }
       success = true;
     } finally {
       if (success == false) {
-        for(int j=0;j<numReaders;j++) {
-          if (segStates[j] != null) {
+        for(SegmentState segState : segStates) {
             try {
-              segStates[j].finish(pool);
+              segState.finish(pool);
             } catch (Throwable th) {
               // suppress so we keep throwing original exc
             }
-          }
         }
       }
     }
-
-    return segStates;
+    
+    return segStates.toArray(new SegmentState[0]);
   }
 
   /** Close segment states previously opened with openSegmentStates. */
-  private ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success, long gen) throws IOException {
-    int numReaders = segStates.length;
+  public ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success) throws IOException {
+    int count = segStates.length;
     Throwable firstExc = null;
     List<SegmentCommitInfo> allDeleted = null;
     long totDelCount = 0;
-    for (int j=0;j<numReaders;j++) {
+
+    for (int j=0;j<count;j++) {
       SegmentState segState = segStates[j];
       if (success) {
         totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
-        segState.reader.getSegmentInfo().setBufferedDeletesGen(gen);
         int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
         assert fullDelCount <= segState.rld.info.info.maxDoc();
         if (fullDelCount == segState.rld.info.info.maxDoc()) {
@@ -469,288 +359,70 @@ class BufferedUpdatesStream implements Accountable {
     }
 
     if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD", "applyDeletes: " + totDelCount + " new deleted documents");
+      infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + updates.size() + " packets; bytesUsed=" + pool.ramBytesUsed());
     }
 
-    return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);      
+    return new ApplyDeletesResult(totDelCount > 0, allDeleted);      
   }
 
-  /** Merge sorts the deleted terms and all segments to resolve terms to docIDs for deletion. */
-  private synchronized long applyTermDeletes(CoalescedUpdates updates, SegmentState[] segStates) throws IOException {
-
-    long startNS = System.nanoTime();
-
-    int numReaders = segStates.length;
-
-    long delTermVisitedCount = 0;
-    long segTermVisitedCount = 0;
-
-    FieldTermIterator iter = updates.termIterator();
-
-    String field = null;
-    SegmentQueue queue = null;
-
-    BytesRef term;
-
-    while ((term = iter.next()) != null) {
-
-      if (iter.field() != field) {
-        // field changed
-        field = iter.field();
-
-        queue = new SegmentQueue(numReaders);
-
-        long segTermCount = 0;
-        for (SegmentState state : segStates) {
-          Terms terms = state.reader.terms(field);
-          if (terms != null) {
-            segTermCount += terms.size();
-            state.termsEnum = terms.iterator();
-            state.term = state.termsEnum.next();
-            if (state.term != null) {
-              queue.add(state);
-            }
-          }
-        }
-
-        assert checkDeleteTerm(null);
-      }
-
-      assert checkDeleteTerm(term);
-
-      delTermVisitedCount++;
-
-      long delGen = iter.delGen();
-
-      while (queue.size() != 0) {
-
-        // Get next term merged across all segments
-        SegmentState state = queue.top();
-        segTermVisitedCount++;
-
-        int cmp = term.compareTo(state.term);
-
-        if (cmp < 0) {
-          break;
-        } else if (cmp == 0) {
-          // fall through
-        } else {
-          TermsEnum.SeekStatus status = state.termsEnum.seekCeil(term);
-          if (status == TermsEnum.SeekStatus.FOUND) {
-            // fallthrough
-          } else {
-            if (status == TermsEnum.SeekStatus.NOT_FOUND) {
-              state.term = state.termsEnum.term();
-              queue.updateTop();
-            } else {
-              // No more terms in this segment
-              queue.pop();
-            }
-
-            continue;
-          }
-        }
-
-        assert state.delGen != delGen;
-
-        if (state.delGen < delGen) {
+  // only for assert
+  private boolean checkDeleteStats() {
+    int numTerms2 = 0;
+    long bytesUsed2 = 0;
+    for(FrozenBufferedUpdates packet : updates) {
+      numTerms2 += packet.numTermDeletes;
+      bytesUsed2 += packet.bytesUsed;
+    }
+    assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+    assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+    return true;
+  }
 
-          // we don't need term frequencies for this
-          final Bits acceptDocs = state.rld.getLiveDocs();
-          state.postingsEnum = state.termsEnum.postings(state.postingsEnum, PostingsEnum.NONE);
+  /** Tracks the contiguous range of packets that have finished resolving.  We need this because the packets
+   *  are concurrently resolved, and we can only write to disk the contiguous completed
+   *  packets. */
+  private static class FinishedSegments {
 
-          assert state.postingsEnum != null;
+    /** Largest del gen, inclusive, for which all prior packets have finished applying. */
+    private long completedDelGen;
 
-          while (true) {
-            final int docID = state.postingsEnum.nextDoc();
-            if (docID == DocIdSetIterator.NO_MORE_DOCS) {
-              break;
-            }
-            if (acceptDocs != null && acceptDocs.get(docID) == false) {
-              continue;
-            }
-            if (!state.any) {
-              state.rld.initWritableLiveDocs();
-              state.any = true;
-            }
+    /** This lets us track the "holes" in the current frontier of applying del
+     *  gens; once the holes are filled in we can advance completedDelGen. */
+    private final Set<Long> finishedDelGens = new HashSet<>();
 
-            // NOTE: there is no limit check on the docID
-            // when deleting by Term (unlike by Query)
-            // because on flush we apply all Term deletes to
-            // each segment.  So all Term deleting here is
-            // against prior segments:
-            state.rld.delete(docID);
-          }
-        }
+    private final InfoStream infoStream;
 
-        state.term = state.termsEnum.next();
-        if (state.term == null) {
-          queue.pop();
-        } else {
-          queue.updateTop();
-        }
-      }
+    public FinishedSegments(InfoStream infoStream) {
+      this.infoStream = infoStream;
     }
 
-    if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD",
-                         String.format(Locale.ROOT, "applyTermDeletes took %.1f msec for %d segments and %d packets; %d del terms visited; %d seg terms visited",
-                                       (System.nanoTime()-startNS)/1000000.,
-                                       numReaders,
-                                       updates.terms.size(),
-                                       delTermVisitedCount, segTermVisitedCount));
+    public synchronized void clear() {
+      finishedDelGens.clear();
+      completedDelGen = 0;
     }
 
-    return delTermVisitedCount;
-  }
+    public synchronized boolean stillRunning(long delGen) {
+      return delGen > completedDelGen && finishedDelGens.contains(delGen) == false;
+    }
 
-  private synchronized void applyDocValuesUpdatesList(List<List<DocValuesUpdate>> updates, 
-      SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
-    // we walk backwards through the segments, appending deletion packets to the coalesced updates, so we must apply the packets in reverse
-    // so that newer packets override older ones:
-    for(int idx=updates.size()-1;idx>=0;idx--) {
-      applyDocValuesUpdates(updates.get(idx), segState, dvUpdatesContainer);
+    public synchronized long getCompletedDelGen() {
+      return completedDelGen;
     }
-  }
 
-  // DocValues updates
-  private synchronized void applyDocValuesUpdates(List<DocValuesUpdate> updates, 
-      SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
-
-    // TODO: we can process the updates per DV field, from last to first so that
-    // if multiple terms affect same document for the same field, we add an update
-    // only once (that of the last term). To do that, we can keep a bitset which
-    // marks which documents have already been updated. So e.g. if term T1
-    // updates doc 7, and then we process term T2 and it updates doc 7 as well,
-    // we don't apply the update since we know T1 came last and therefore wins
-    // the update.
-    // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
-    // that these documents aren't even returned.
-    
-    String currentField = null;
-    TermsEnum termsEnum = null;
-    PostingsEnum postingsEnum = null;
-    
-    for (DocValuesUpdate update : updates) {
-      Term term = update.term;
-      int limit = update.docIDUpto;
-      
-      // TODO: we traverse the terms in update order (not term order) so that we
-      // apply the updates in the correct order, i.e. if two terms udpate the
-      // same document, the last one that came in wins, irrespective of the
-      // terms lexical order.
-      // we can apply the updates in terms order if we keep an updatesGen (and
-      // increment it with every update) and attach it to each NumericUpdate. Note
-      // that we cannot rely only on docIDUpto because an app may send two updates
-      // which will get same docIDUpto, yet will still need to respect the order
-      // those updates arrived.
-      
-      if (!term.field().equals(currentField)) {
-        // if we change the code to process updates in terms order, enable this assert
-//        assert currentField == null || currentField.compareTo(term.field()) < 0;
-        currentField = term.field();
-        Terms terms = segState.reader.terms(currentField);
-        if (terms != null) {
-          termsEnum = terms.iterator();
+    public synchronized void finishedSegment(long delGen) {
+      finishedDelGens.add(delGen);
+      while (true) {
+        if (finishedDelGens.contains(completedDelGen + 1)) {
+          finishedDelGens.remove(completedDelGen + 1);
+          completedDelGen++;
         } else {
-          termsEnum = null;
-        }
-      }
-
-      if (termsEnum == null) {
-        // no terms in this field
-        continue;
-      }
-
-      if (termsEnum.seekExact(term.bytes())) {
-        // we don't need term frequencies for this
-        final Bits acceptDocs = segState.rld.getLiveDocs();
-        postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
-
-        DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type);
-        if (dvUpdates == null) {
-          dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, segState.reader.maxDoc());
-        }
-        int doc;
-        while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
-          if (doc >= limit) {
-            break; // no more docs that can be updated for this term
-          }
-          if (acceptDocs != null && acceptDocs.get(doc) == false) {
-            continue;
-          }
-          dvUpdates.add(doc, update.value);
+          break;
         }
       }
-    }
-  }
-  
-  public static class QueryAndLimit {
-    public final Query query;
-    public final int limit;
-    public QueryAndLimit(Query query, int limit) {
-      this.query = query;
-      this.limit = limit;
-    }
-  }
 
-  // Delete by query
-  private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentState segState) throws IOException {
-    long delCount = 0;
-    final LeafReaderContext readerContext = segState.reader.getContext();
-    for (QueryAndLimit ent : queriesIter) {
-      Query query = ent.query;
-      int limit = ent.limit;
-      final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
-      searcher.setQueryCache(null);
-      final Weight weight = searcher.createNormalizedWeight(query, false);
-      final Scorer scorer = weight.scorer(readerContext);
-      if (scorer != null) {
-        final DocIdSetIterator it = scorer.iterator();
-        final Bits liveDocs = readerContext.reader().getLiveDocs();
-        while (true)  {
-          int doc = it.nextDoc();
-          if (doc >= limit) {
-            break;
-          }
-          if (liveDocs != null && liveDocs.get(doc) == false) {
-            continue;
-          }
-
-          if (!segState.any) {
-            segState.rld.initWritableLiveDocs();
-            segState.any = true;
-          }
-          if (segState.rld.delete(doc)) {
-            delCount++;
-          }
-        }
+      if (infoStream.isEnabled("BD")) {
+        infoStream.message("BD", "finished packet delGen=" + delGen + " now completedDelGen=" + completedDelGen);
       }
     }
-
-    return delCount;
-  }
-
-  // used only by assert
-  private boolean checkDeleteTerm(BytesRef term) {
-    if (term != null) {
-      assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) >= 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
-    }
-    // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
-    lastDeleteTerm = term == null ? null : BytesRef.deepCopyOf(term);
-    return true;
-  }
-
-  // only for assert
-  private boolean checkDeleteStats() {
-    int numTerms2 = 0;
-    long bytesUsed2 = 0;
-    for(FrozenBufferedUpdates packet : updates) {
-      numTerms2 += packet.numTermDeletes;
-      bytesUsed2 += packet.bytesUsed;
-    }
-    assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
-    assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
-    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
deleted file mode 100644
index bf92ac1..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
-import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
-import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.util.BytesRef;
-
-class CoalescedUpdates {
-  final Map<Query,Integer> queries = new HashMap<>();
-  final List<PrefixCodedTerms> terms = new ArrayList<>();
-  final List<List<DocValuesUpdate>> numericDVUpdates = new ArrayList<>();
-  final List<List<DocValuesUpdate>> binaryDVUpdates = new ArrayList<>();
-  long totalTermCount;
-  
-  @Override
-  public String toString() {
-    // note: we could add/collect more debugging information
-    return "CoalescedUpdates(termSets=" + terms.size()
-      + ",totalTermCount=" + totalTermCount
-      + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
-      + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
-  }
-
-  void update(FrozenBufferedUpdates in) {
-    totalTermCount += in.terms.size();
-    terms.add(in.terms);
-
-    for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
-      final Query query = in.queries[queryIdx];
-      queries.put(query, BufferedUpdates.MAX_INT);
-    }
-
-    List<DocValuesUpdate> numericPacket = new ArrayList<>();
-    numericDVUpdates.add(numericPacket);
-    for (NumericDocValuesUpdate nu : in.numericDVUpdates) {
-      NumericDocValuesUpdate clone = new NumericDocValuesUpdate(nu.term, nu.field, (Long) nu.value);
-      clone.docIDUpto = Integer.MAX_VALUE;
-      numericPacket.add(clone);
-    }
-    
-    List<DocValuesUpdate> binaryPacket = new ArrayList<>();
-    binaryDVUpdates.add(binaryPacket);
-    for (BinaryDocValuesUpdate bu : in.binaryDVUpdates) {
-      BinaryDocValuesUpdate clone = new BinaryDocValuesUpdate(bu.term, bu.field, (BytesRef) bu.value);
-      clone.docIDUpto = Integer.MAX_VALUE;
-      binaryPacket.add(clone);
-    }
-  }
-
-  public FieldTermIterator termIterator() {
-    if (terms.size() == 1) {
-      return terms.get(0).iterator();
-    } else {
-      return new MergedPrefixCodedTermsIterator(terms);
-    }
-  }
-
-  public Iterable<QueryAndLimit> queriesIterable() {
-    return new Iterable<QueryAndLimit>() {
-      
-      @Override
-      public Iterator<QueryAndLimit> iterator() {
-        return new Iterator<QueryAndLimit>() {
-          private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
-
-          @Override
-          public boolean hasNext() {
-            return iter.hasNext();
-          }
-
-          @Override
-          public QueryAndLimit next() {
-            final Map.Entry<Query,Integer> ent = iter.next();
-            return new QueryAndLimit(ent.getKey(), ent.getValue());
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException();
-          }
-        };
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
index 528d4bf..a54bbe9 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
@@ -16,15 +16,13 @@
  */
 package org.apache.lucene.index;
 
-
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.util.packed.PagedGrowableWriter;
+import org.apache.lucene.util.PriorityQueue;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
 
 /**
- * Holds updates of a single DocValues field, for a set of documents.
+ * Holds updates of a single DocValues field, for a set of documents within one segment.
  * 
  * @lucene.experimental
  */
@@ -54,100 +52,114 @@ abstract class DocValuesFieldUpdates {
      * {@code null} value means that it was unset for this document.
      */
     abstract Object value();
-    
-    /**
-     * Reset the iterator's state. Should be called before {@link #nextDoc()}
-     * and {@link #value()}.
-     */
-    abstract void reset();
-    
+
+    /** Returns delGen for this packet. */
+    abstract long delGen();
   }
 
-  static class Container {
-  
-    final Map<String,NumericDocValuesFieldUpdates> numericDVUpdates = new HashMap<>();
-    final Map<String,BinaryDocValuesFieldUpdates> binaryDVUpdates = new HashMap<>();
-    
-    boolean any() {
-      for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) {
-        if (updates.any()) {
-          return true;
-        }
-      }
-      for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) {
-        if (updates.any()) {
-          return true;
+  /** Merge-sorts multiple iterators, one per delGen, favoring the largest delGen that has updates for a given docID. */
+  public static Iterator mergedIterator(Iterator[] subs) {
+
+    if (subs.length == 1) {
+      return subs[0];
+    }
+
+    PriorityQueue<Iterator> queue = new PriorityQueue<Iterator>(subs.length) {
+        @Override
+        protected boolean lessThan(Iterator a, Iterator b) {
+          // sort by smaller docID
+          int cmp = Integer.compare(a.doc(), b.doc());
+          if (cmp == 0) {
+            // then by larger delGen
+            cmp = Long.compare(b.delGen(), a.delGen());
+
+            // delGens are unique across our subs:
+            assert cmp != 0;
+          }
+
+          return cmp < 0;
         }
+      };
+
+    for (Iterator sub : subs) {
+      if (sub.nextDoc() != NO_MORE_DOCS) {
+        queue.add(sub);
       }
-      return false;
     }
-    
-    int size() {
-      return numericDVUpdates.size() + binaryDVUpdates.size();
+
+    if (queue.size() == 0) {
+      return null;
     }
-    
-    long ramBytesPerDoc() {
-      long ramBytesPerDoc = 0;
-      for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) {
-        ramBytesPerDoc += updates.ramBytesPerDoc();
+
+    return new Iterator() {
+      private int doc;
+
+      private boolean first = true;
+      
+      @Override
+      public int nextDoc() {
+        // TODO: can we do away with this first boolean?
+        if (first == false) {
+          // Advance all sub iterators past current doc
+          while (true) {
+            if (queue.size() == 0) {
+              doc = NO_MORE_DOCS;
+              break;
+            }
+            int newDoc = queue.top().doc();
+            if (newDoc != doc) {
+              assert newDoc > doc: "doc=" + doc + " newDoc=" + newDoc;
+              doc = newDoc;
+              break;
+            }
+            if (queue.top().nextDoc() == NO_MORE_DOCS) {
+              queue.pop();
+            } else {
+              queue.updateTop();
+            }
+          }
+        } else {
+          doc = queue.top().doc();
+          first = false;
+        }
+        return doc;
       }
-      for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) {
-        ramBytesPerDoc += updates.ramBytesPerDoc();
+        
+      @Override
+      public int doc() {
+        return doc;
       }
-      return ramBytesPerDoc;
-    }
-    
-    DocValuesFieldUpdates getUpdates(String field, DocValuesType type) {
-      switch (type) {
-        case NUMERIC:
-          return numericDVUpdates.get(field);
-        case BINARY:
-          return binaryDVUpdates.get(field);
-        default:
-          throw new IllegalArgumentException("unsupported type: " + type);
+
+      @Override
+      public Object value() {
+        return queue.top().value();
       }
-    }
-    
-    DocValuesFieldUpdates newUpdates(String field, DocValuesType type, int maxDoc) {
-      switch (type) {
-        case NUMERIC:
-          assert numericDVUpdates.get(field) == null;
-          NumericDocValuesFieldUpdates numericUpdates = new NumericDocValuesFieldUpdates(field, maxDoc);
-          numericDVUpdates.put(field, numericUpdates);
-          return numericUpdates;
-        case BINARY:
-          assert binaryDVUpdates.get(field) == null;
-          BinaryDocValuesFieldUpdates binaryUpdates = new BinaryDocValuesFieldUpdates(field, maxDoc);
-          binaryDVUpdates.put(field, binaryUpdates);
-          return binaryUpdates;
-        default:
-          throw new IllegalArgumentException("unsupported type: " + type);
+
+      @Override
+      public long delGen() {
+        throw new UnsupportedOperationException();
       }
-    }
-    
-    @Override
-    public String toString() {
-      return "numericDVUpdates=" + numericDVUpdates + " binaryDVUpdates=" + binaryDVUpdates;
-    }
+    };
   }
-  
+
   final String field;
   final DocValuesType type;
-  
-  protected DocValuesFieldUpdates(String field, DocValuesType type) {
+  final long delGen;
+  protected boolean finished;
+  protected final int maxDoc;
+    
+  protected DocValuesFieldUpdates(int maxDoc, long delGen, String field, DocValuesType type) {
+    this.maxDoc = maxDoc;
+    this.delGen = delGen;
     this.field = field;
     if (type == null) {
       throw new NullPointerException("DocValuesType must not be null");
     }
     this.type = type;
   }
-  
-  /**
-   * Returns the estimated capacity of a {@link PagedGrowableWriter} given the
-   * actual number of stored elements.
-   */
-  protected static int estimateCapacity(int size) {
-    return (int) Math.ceil((double) size / PAGE_SIZE) * PAGE_SIZE;
+
+  public boolean getFinished() {
+    return finished;
   }
   
   /**
@@ -160,19 +172,17 @@ abstract class DocValuesFieldUpdates {
    * Returns an {@link Iterator} over the updated documents and their
    * values.
    */
+  // TODO: also use this for merging, instead of having to write through to disk first
   public abstract Iterator iterator();
-  
-  /**
-   * Merge with another {@link DocValuesFieldUpdates}. This is called for a
-   * segment which received updates while it was being merged. The given updates
-   * should override whatever updates are in that instance.
-   */
-  public abstract void merge(DocValuesFieldUpdates other);
 
+  /** Freezes internal data structures and sorts updates by docID for efficient iteration. */
+  public abstract void finish();
+  
   /** Returns true if this instance contains any updates. */
   public abstract boolean any();
   
-  /** Returns approximate RAM bytes used per document. */
-  public abstract long ramBytesPerDoc();
+  /** Returns approximate RAM bytes used. */
+  public abstract long ramBytesUsed();
 
+  public abstract int size();
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
index 1c85f33..a66f930 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
@@ -85,7 +85,6 @@ abstract class DocValuesUpdate {
     long valueSizeInBytes() {
       return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length;
     }
-    
   }
 
   /** An in-place update to a numeric DocValues field */
@@ -99,7 +98,5 @@ abstract class DocValuesUpdate {
     long valueSizeInBytes() {
       return Long.BYTES;
     }
-    
   }
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 2807517..27e28c0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -107,7 +107,7 @@ final class DocumentsWriter implements Closeable, Accountable {
   private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
   // TODO: cut over to BytesRefHash in BufferedDeletes
-  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
+  volatile DocumentsWriterDeleteQueue deleteQueue;
   private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
   /*
    * we preserve changes during a full flush since IW might not checkout before
@@ -129,6 +129,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     this.directory = directory;
     this.config = config;
     this.infoStream = config.getInfoStream();
+    this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
     this.perThreadPool = config.getIndexerThreadPool();
     flushPolicy = config.getFlushPolicy();
     this.writer = writer;
@@ -141,10 +142,10 @@ final class DocumentsWriter implements Closeable, Accountable {
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     long seqNo = deleteQueue.addDelete(queries);
     flushControl.doOnDelete();
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
-    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
 
@@ -160,10 +161,10 @@ final class DocumentsWriter implements Closeable, Accountable {
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     long seqNo = deleteQueue.addDelete(terms);
     flushControl.doOnDelete();
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
-    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
 
@@ -171,20 +172,21 @@ final class DocumentsWriter implements Closeable, Accountable {
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     long seqNo = deleteQueue.addDocValuesUpdates(updates);
     flushControl.doOnDelete();
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
-    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
   
   DocumentsWriterDeleteQueue currentDeleteSession() {
     return deleteQueue;
   }
-  
+
+  /** If buffered deletes are using too much heap, resolve them and write disk and return true. */
   private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
     if (flushControl.getAndResetApplyAllDeletes()) {
-      if (deleteQueue != null && !flushControl.isFullFlush()) {
+      if (deleteQueue != null) {
         ticketQueue.addDeletes(deleteQueue);
       }
       putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
@@ -200,7 +202,6 @@ final class DocumentsWriter implements Closeable, Accountable {
       return ticketQueue.tryPurge(writer);
     }
   }
-  
 
   /** Returns how many docs are currently buffered in RAM. */
   int getNumDocs() {
@@ -246,11 +247,13 @@ final class DocumentsWriter implements Closeable, Accountable {
   }
 
   /** Returns how many documents were aborted. */
-  synchronized long lockAndAbortAll(IndexWriter indexWriter) {
+  synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException {
     assert indexWriter.holdsFullFlushLock();
     if (infoStream.isEnabled("DW")) {
       infoStream.message("DW", "lockAndAbortAll");
     }
+    // Make sure we move all pending tickets into the flush queue:
+    ticketQueue.forcePurge(indexWriter);
     long abortedDocCount = 0;
     boolean success = false;
     try {
@@ -578,9 +581,7 @@ final class DocumentsWriter implements Closeable, Accountable {
      
       flushingDWPT = flushControl.nextPendingFlush();
     }
-    if (hasEvents) {
-      putEvent(MergePendingEvent.INSTANCE);
-    }
+
     // If deletes alone are consuming > 1/2 our RAM
     // buffer, force them all to apply now. This is to
     // prevent too-frequent flushing of a long tail of
@@ -589,9 +590,9 @@ final class DocumentsWriter implements Closeable, Accountable {
     if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
         flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
       hasEvents = true;
-      if (!this.applyAllDeletes(deleteQueue)) {
+      if (applyAllDeletes(deleteQueue) == false) {
         if (infoStream.isEnabled("DW")) {
-          infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
+          infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
                                                  flushControl.getDeleteBytesUsed()/(1024.*1024.),
                                                  ramBufferSizeMB));
         }
@@ -654,7 +655,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       }
       // If a concurrent flush is still in flight wait for it
       flushControl.waitForFlush();  
-      if (!anythingFlushed && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
+      if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
         }
@@ -695,7 +696,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     return config;
   }
   
-  private void putEvent(Event event) {
+  void putEvent(Event event) {
     events.add(event);
   }
 
@@ -704,6 +705,25 @@ final class DocumentsWriter implements Closeable, Accountable {
     return flushControl.ramBytesUsed();
   }
 
+  static final class ResolveUpdatesEvent implements Event {
+
+    private final FrozenBufferedUpdates packet;
+    
+    ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
+      this.packet = packet;
+    }
+
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      try {
+        packet.apply(writer);
+      } catch (Throwable t) {
+        writer.tragicEvent(t, "applyUpdatesPacket");
+      }
+      writer.flushDeletesCount.incrementAndGet();
+    }
+  }
+
   static final class ApplyDeletesEvent implements Event {
     static final Event INSTANCE = new ApplyDeletesEvent();
     private int instCount = 0;
@@ -717,21 +737,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       writer.applyDeletesAndPurge(true); // we always purge!
     }
   }
-  
-  static final class MergePendingEvent implements Event {
-    static final Event INSTANCE = new MergePendingEvent();
-    private int instCount = 0; 
-    private MergePendingEvent() {
-      assert instCount == 0;
-      instCount++;
-    }
-   
-    @Override
-    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
-      writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
-    }
-  }
-  
+
   static final class ForcedPurgeEvent implements Event {
     static final Event INSTANCE = new ForcedPurgeEvent();
     private int instCount = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index db0e571..c4a0845 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.lucene.index;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -25,6 +26,7 @@ import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.InfoStream;
 
 /**
  * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
@@ -85,19 +87,22 @@ final class DocumentsWriterDeleteQueue implements Accountable {
   /** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
   private final AtomicLong nextSeqNo;
 
+  private final InfoStream infoStream;
+
   // for asserts
   long maxSeqNo = Long.MAX_VALUE;
   
-  DocumentsWriterDeleteQueue() {
+  DocumentsWriterDeleteQueue(InfoStream infoStream) {
     // seqNo must start at 1 because some APIs negate this to also return a boolean
-    this(0, 1);
+    this(infoStream, 0, 1);
   }
   
-  DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
-    this(new BufferedUpdates("global"), generation, startSeqNo);
+  DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo) {
+    this(infoStream, new BufferedUpdates("global"), generation, startSeqNo);
   }
 
-  DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
+  DocumentsWriterDeleteQueue(InfoStream infoStream, BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
+    this.infoStream = infoStream;
     this.globalBufferedUpdates = globalBufferedUpdates;
     this.generation = generation;
     this.nextSeqNo = new AtomicLong(startSeqNo);
@@ -189,7 +194,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
     }
   }
 
-  FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) {
+  FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) throws IOException {
     globalBufferLock.lock();
     /*
      * Here we freeze the global buffer so we need to lock it, apply all
@@ -209,9 +214,13 @@ final class DocumentsWriterDeleteQueue implements Accountable {
         globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
       }
 
-      final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
-      globalBufferedUpdates.clear();
-      return packet;
+      if (globalBufferedUpdates.any()) {
+        final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
+        globalBufferedUpdates.clear();
+        return packet;
+      } else {
+        return null;
+      }
     } finally {
       globalBufferLock.unlock();
     }
@@ -426,7 +435,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
     globalBufferLock.lock();
     try {
       forceApplyGlobalSlice();
-      return globalBufferedUpdates.terms.size();
+      return globalBufferedUpdates.deleteTerms.size();
     } finally {
       globalBufferLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index a5b4b7c..047fb9c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -424,22 +424,16 @@ final class DocumentsWriterFlushControl implements Accountable {
     };
   }
 
-  
-
   synchronized void doOnDelete() {
     // pass null this is a global delete no update
     flushPolicy.onDelete(this, null);
   }
 
-  /**
-   * Returns the number of delete terms in the global pool
-   */
-  public int getNumGlobalTermDeletes() {
-    return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedUpdatesStream.numTerms();
-  }
-  
+  /** Returns heap bytes currently consumed by buffered deletes/updates that would be
+   *  freed if we pushed all deletes.  This does not include bytes consumed by
+   *  already pushed delete/update packets. */
   public long getDeleteBytesUsed() {
-    return documentsWriter.deleteQueue.ramBytesUsed() + bufferedUpdatesStream.ramBytesUsed();
+    return documentsWriter.deleteQueue.ramBytesUsed();
   }
 
   @Override
@@ -501,7 +495,7 @@ final class DocumentsWriterFlushControl implements Accountable {
       seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
       flushingQueue.maxSeqNo = seqNo+1;
 
-      DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
+      DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation+1, seqNo+1);
 
       documentsWriter.deleteQueue = newQueue;
     }
@@ -648,8 +642,7 @@ final class DocumentsWriterFlushControl implements Accountable {
       }
       for (BlockedFlush blockedFlush : blockedFlushes) {
         try {
-          flushingWriters
-              .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+          flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
           documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
           blockedFlush.dwpt.abort();
         } catch (Throwable ex) {
@@ -720,6 +713,4 @@ final class DocumentsWriterFlushControl implements Accountable {
   public InfoStream getInfoStream() {
     return infoStream;
   }
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
index 2c62487..df1b38c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
@@ -60,7 +60,7 @@ class DocumentsWriterFlushQueue {
     assert numTickets >= 0;
   }
 
-  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
+  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
     // Each flush is assigned a ticket in the order they acquire the ticketQueue
     // lock
     incTickets();
@@ -168,11 +168,11 @@ class DocumentsWriterFlushQueue {
     protected boolean published = false;
 
     protected FlushTicket(FrozenBufferedUpdates frozenUpdates) {
-      assert frozenUpdates != null;
       this.frozenUpdates = frozenUpdates;
     }
 
     protected abstract void publish(IndexWriter writer) throws IOException;
+
     protected abstract boolean canPublish();
     
     /**
@@ -186,33 +186,31 @@ class DocumentsWriterFlushQueue {
       assert newSegment != null;
       assert newSegment.segmentInfo != null;
       final FrozenBufferedUpdates segmentUpdates = newSegment.segmentUpdates;
-      //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
       if (indexWriter.infoStream.isEnabled("DW")) {
-          indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);  
+        indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);  
       }
       
       if (segmentUpdates != null && indexWriter.infoStream.isEnabled("DW")) {
-          indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
+        indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
       }
       // now publish!
-      indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentUpdates, globalPacket);
+      indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentUpdates, globalPacket, newSegment.sortMap);
     }
     
     protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates)
             throws IOException {
-        // Finish the flushed segment and publish it to IndexWriter
-        if (newSegment == null) {
-          assert bufferedUpdates != null;
-          if (bufferedUpdates != null && bufferedUpdates.any()) {
-            indexWriter.publishFrozenUpdates(bufferedUpdates);
-            if (indexWriter.infoStream.isEnabled("DW")) {
-                indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
-            }
+      // Finish the flushed segment and publish it to IndexWriter
+      if (newSegment == null) {
+        if (bufferedUpdates != null && bufferedUpdates.any()) {
+          indexWriter.publishFrozenUpdates(bufferedUpdates);
+          if (indexWriter.infoStream.isEnabled("DW")) {
+            indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
           }
-        } else {
-            publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);  
         }
+      } else {
+        publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);  
       }
+    }
   }
   
   static final class GlobalDeletesTicket extends FlushTicket {
@@ -220,6 +218,7 @@ class DocumentsWriterFlushQueue {
     protected GlobalDeletesTicket(FrozenBufferedUpdates frozenUpdates) {
       super(frozenUpdates);
     }
+
     @Override
     protected void publish(IndexWriter writer) throws IOException {
       assert !published : "ticket was already publised - can not publish twice";
@@ -264,4 +263,4 @@ class DocumentsWriterFlushQueue {
       return segment != null || failed;
     }
   }
-}
\ No newline at end of file
+}