You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC

svn commit: r1534320 [11/39] - in /lucene/dev/branches/lucene4956: ./ dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/ dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/ dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java Mon Oct 21 18:58:24 2013
@@ -410,7 +410,7 @@ public class CheckIndex {
     // note: we only read the format byte (required preamble) here!
     IndexInput input = null;
     try {
-      input = dir.openInput(segmentsFileName, IOContext.DEFAULT);
+      input = dir.openInput(segmentsFileName, IOContext.READONCE);
     } catch (Throwable t) {
       msg(infoStream, "ERROR: could not open segments file in directory");
       if (infoStream != null)
@@ -496,6 +496,11 @@ public class CheckIndex {
       msg(infoStream, "  " + (1+i) + " of " + numSegments + ": name=" + info.info.name + " docCount=" + info.info.getDocCount());
       segInfoStat.name = info.info.name;
       segInfoStat.docCount = info.info.getDocCount();
+      
+      final String version = info.info.getVersion();
+      if (info.info.getDocCount() <= 0 && version != null && versionComparator.compare(version, "4.5") >= 0) {
+        throw new RuntimeException("illegal number of documents: maxDoc=" + info.info.getDocCount());
+      }
 
       int toLoseDocCount = info.info.getDocCount();
 
@@ -517,11 +522,6 @@ public class CheckIndex {
           msg(infoStream, "    diagnostics = " + diagnostics);
         }
 
-        Map<String,String> atts = info.info.attributes();
-        if (atts != null && !atts.isEmpty()) {
-          msg(infoStream, "    attributes = " + atts);
-        }
-
         if (!info.hasDeletions()) {
           msg(infoStream, "    no deletions");
           segInfoStat.hasDeletions = false;
@@ -744,10 +744,40 @@ public class CheckIndex {
         continue;
       }
       
+      final boolean hasFreqs = terms.hasFreqs();
       final boolean hasPositions = terms.hasPositions();
+      final boolean hasPayloads = terms.hasPayloads();
       final boolean hasOffsets = terms.hasOffsets();
-      // term vectors cannot omit TF
-      final boolean hasFreqs = isVectors || fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
+
+      // term vectors cannot omit TF:
+      final boolean expectedHasFreqs = (isVectors || fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0);
+
+      if (hasFreqs != expectedHasFreqs) {
+        throw new RuntimeException("field \"" + field + "\" should have hasFreqs=" + expectedHasFreqs + " but got " + hasFreqs);
+      }
+
+      if (hasFreqs == false) {
+        if (terms.getSumTotalTermFreq() != -1) {
+          throw new RuntimeException("field \"" + field + "\" hasFreqs is false, but Terms.getSumTotalTermFreq()=" + terms.getSumTotalTermFreq() + " (should be -1)");
+        }
+      }
+
+      if (!isVectors) {
+        final boolean expectedHasPositions = fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
+        if (hasPositions != expectedHasPositions) {
+          throw new RuntimeException("field \"" + field + "\" should have hasPositions=" + expectedHasPositions + " but got " + hasPositions);
+        }
+
+        final boolean expectedHasPayloads = fieldInfo.hasPayloads();
+        if (hasPayloads != expectedHasPayloads) {
+          throw new RuntimeException("field \"" + field + "\" should have hasPayloads=" + expectedHasPayloads + " but got " + hasPayloads);
+        }
+
+        final boolean expectedHasOffsets = fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
+        if (hasOffsets != expectedHasOffsets) {
+          throw new RuntimeException("field \"" + field + "\" should have hasOffsets=" + expectedHasOffsets + " but got " + hasOffsets);
+        }
+      }
 
       final TermsEnum termsEnum = terms.iterator(null);
       
@@ -756,8 +786,6 @@ public class CheckIndex {
       
       BytesRef lastTerm = null;
       
-      Comparator<BytesRef> termComp = terms.getComparator();
-      
       long sumTotalTermFreq = 0;
       long sumDocFreq = 0;
       FixedBitSet visitedDocs = new FixedBitSet(maxDoc);
@@ -775,7 +803,7 @@ public class CheckIndex {
         if (lastTerm == null) {
           lastTerm = BytesRef.deepCopyOf(term);
         } else {
-          if (termComp.compare(lastTerm, term) >= 0) {
+          if (lastTerm.compareTo(term) >= 0) {
             throw new RuntimeException("terms out of order: lastTerm=" + lastTerm + " term=" + term);
           }
           lastTerm.copyBytes(term);
@@ -789,6 +817,12 @@ public class CheckIndex {
         
         docs = termsEnum.docs(liveDocs, docs);
         postings = termsEnum.docsAndPositions(liveDocs, postings);
+
+        if (hasFreqs == false) {
+          if (termsEnum.totalTermFreq() != -1) {
+            throw new RuntimeException("field \"" + field + "\" hasFreqs is false, but TermsEnum.totalTermFreq()=" + termsEnum.totalTermFreq() + " (should be -1)");   
+          }
+        }
         
         if (hasOrd) {
           long ord = -1;
@@ -831,6 +865,13 @@ public class CheckIndex {
             }
             status.totPos += freq;
             totalTermFreq += freq;
+          } else {
+            // When a field didn't index freq, it must
+            // consistently "lie" and pretend that freq was
+            // 1:
+            if (docs2.freq() != 1) {
+              throw new RuntimeException("term " + term + ": doc " + doc + ": freq " + freq + " != 1 when Terms.hasFreqs() is false");
+            }
           }
           docCount++;
           
@@ -1275,7 +1316,8 @@ public class CheckIndex {
           if (reader.getBinaryDocValues(fieldInfo.name) != null ||
               reader.getNumericDocValues(fieldInfo.name) != null ||
               reader.getSortedDocValues(fieldInfo.name) != null || 
-              reader.getSortedSetDocValues(fieldInfo.name) != null) {
+              reader.getSortedSetDocValues(fieldInfo.name) != null || 
+              reader.getDocsWithField(fieldInfo.name) != null) {
             throw new RuntimeException("field: " + fieldInfo.name + " has docvalues but should omit them!");
           }
         }
@@ -1296,26 +1338,37 @@ public class CheckIndex {
     return status;
   }
   
-  private static void checkBinaryDocValues(String fieldName, AtomicReader reader, BinaryDocValues dv) {
+  private static void checkBinaryDocValues(String fieldName, AtomicReader reader, BinaryDocValues dv, Bits docsWithField) {
     BytesRef scratch = new BytesRef();
     for (int i = 0; i < reader.maxDoc(); i++) {
       dv.get(i, scratch);
       assert scratch.isValid();
+      if (docsWithField.get(i) == false && scratch.length > 0) {
+        throw new RuntimeException("dv for field: " + fieldName + " is missing but has value=" + scratch + " for doc: " + i);
+      }
     }
   }
   
-  private static void checkSortedDocValues(String fieldName, AtomicReader reader, SortedDocValues dv) {
-    checkBinaryDocValues(fieldName, reader, dv);
+  private static void checkSortedDocValues(String fieldName, AtomicReader reader, SortedDocValues dv, Bits docsWithField) {
+    checkBinaryDocValues(fieldName, reader, dv, docsWithField);
     final int maxOrd = dv.getValueCount()-1;
     FixedBitSet seenOrds = new FixedBitSet(dv.getValueCount());
     int maxOrd2 = -1;
     for (int i = 0; i < reader.maxDoc(); i++) {
       int ord = dv.getOrd(i);
-      if (ord < 0 || ord > maxOrd) {
+      if (ord == -1) {
+        if (docsWithField.get(i)) {
+          throw new RuntimeException("dv for field: " + fieldName + " has -1 ord but is not marked missing for doc: " + i);
+        }
+      } else if (ord < -1 || ord > maxOrd) {
         throw new RuntimeException("ord out of bounds: " + ord);
+      } else {
+        if (!docsWithField.get(i)) {
+          throw new RuntimeException("dv for field: " + fieldName + " is missing but has ord=" + ord + " for doc: " + i);
+        }
+        maxOrd2 = Math.max(maxOrd2, ord);
+        seenOrds.set(ord);
       }
-      maxOrd2 = Math.max(maxOrd2, ord);
-      seenOrds.set(ord);
     }
     if (maxOrd != maxOrd2) {
       throw new RuntimeException("dv for field: " + fieldName + " reports wrong maxOrd=" + maxOrd + " but this is not the case: " + maxOrd2);
@@ -1337,7 +1390,7 @@ public class CheckIndex {
     }
   }
   
-  private static void checkSortedSetDocValues(String fieldName, AtomicReader reader, SortedSetDocValues dv) {
+  private static void checkSortedSetDocValues(String fieldName, AtomicReader reader, SortedSetDocValues dv, Bits docsWithField) {
     final long maxOrd = dv.getValueCount()-1;
     OpenBitSet seenOrds = new OpenBitSet(dv.getValueCount());
     long maxOrd2 = -1;
@@ -1345,16 +1398,28 @@ public class CheckIndex {
       dv.setDocument(i);
       long lastOrd = -1;
       long ord;
-      while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
-        if (ord <= lastOrd) {
-          throw new RuntimeException("ords out of order: " + ord + " <= " + lastOrd + " for doc: " + i);
+      if (docsWithField.get(i)) {
+        int ordCount = 0;
+        while ((ord = dv.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          ordCount++;
+          if (ord <= lastOrd) {
+            throw new RuntimeException("ords out of order: " + ord + " <= " + lastOrd + " for doc: " + i);
+          }
+          if (ord < 0 || ord > maxOrd) {
+            throw new RuntimeException("ord out of bounds: " + ord);
+          }
+          lastOrd = ord;
+          maxOrd2 = Math.max(maxOrd2, ord);
+          seenOrds.set(ord);
         }
-        if (ord < 0 || ord > maxOrd) {
-          throw new RuntimeException("ord out of bounds: " + ord);
+        if (ordCount == 0) {
+          throw new RuntimeException("dv for field: " + fieldName + " has no ordinals but is not marked missing for doc: " + i);
+        }
+      } else {
+        long o = dv.nextOrd();
+        if (o != SortedSetDocValues.NO_MORE_ORDS) {
+          throw new RuntimeException("dv for field: " + fieldName + " is marked missing but has ord=" + o + " for doc: " + i);
         }
-        lastOrd = ord;
-        maxOrd2 = Math.max(maxOrd2, ord);
-        seenOrds.set(ord);
       }
     }
     if (maxOrd != maxOrd2) {
@@ -1378,17 +1443,26 @@ public class CheckIndex {
     }
   }
 
-  private static void checkNumericDocValues(String fieldName, AtomicReader reader, NumericDocValues ndv) {
+  private static void checkNumericDocValues(String fieldName, AtomicReader reader, NumericDocValues ndv, Bits docsWithField) {
     for (int i = 0; i < reader.maxDoc(); i++) {
-      ndv.get(i);
+      long value = ndv.get(i);
+      if (docsWithField.get(i) == false && value != 0) {
+        throw new RuntimeException("dv for field: " + fieldName + " is marked missing but has value=" + value + " for doc: " + i);
+      }
     }
   }
   
   private static void checkDocValues(FieldInfo fi, AtomicReader reader, PrintStream infoStream, DocValuesStatus status) throws Exception {
+    Bits docsWithField = reader.getDocsWithField(fi.name);
+    if (docsWithField == null) {
+      throw new RuntimeException(fi.name + " docsWithField does not exist");
+    } else if (docsWithField.length() != reader.maxDoc()) {
+      throw new RuntimeException(fi.name + " docsWithField has incorrect length: " + docsWithField.length() + ",expected: " + reader.maxDoc());
+    }
     switch(fi.getDocValuesType()) {
       case SORTED:
         status.totalSortedFields++;
-        checkSortedDocValues(fi.name, reader, reader.getSortedDocValues(fi.name));
+        checkSortedDocValues(fi.name, reader, reader.getSortedDocValues(fi.name), docsWithField);
         if (reader.getBinaryDocValues(fi.name) != null ||
             reader.getNumericDocValues(fi.name) != null ||
             reader.getSortedSetDocValues(fi.name) != null) {
@@ -1397,7 +1471,7 @@ public class CheckIndex {
         break;
       case SORTED_SET:
         status.totalSortedSetFields++;
-        checkSortedSetDocValues(fi.name, reader, reader.getSortedSetDocValues(fi.name));
+        checkSortedSetDocValues(fi.name, reader, reader.getSortedSetDocValues(fi.name), docsWithField);
         if (reader.getBinaryDocValues(fi.name) != null ||
             reader.getNumericDocValues(fi.name) != null ||
             reader.getSortedDocValues(fi.name) != null) {
@@ -1406,7 +1480,7 @@ public class CheckIndex {
         break;
       case BINARY:
         status.totalBinaryFields++;
-        checkBinaryDocValues(fi.name, reader, reader.getBinaryDocValues(fi.name));
+        checkBinaryDocValues(fi.name, reader, reader.getBinaryDocValues(fi.name), docsWithField);
         if (reader.getNumericDocValues(fi.name) != null ||
             reader.getSortedDocValues(fi.name) != null ||
             reader.getSortedSetDocValues(fi.name) != null) {
@@ -1415,7 +1489,7 @@ public class CheckIndex {
         break;
       case NUMERIC:
         status.totalNumericFields++;
-        checkNumericDocValues(fi.name, reader, reader.getNumericDocValues(fi.name));
+        checkNumericDocValues(fi.name, reader, reader.getNumericDocValues(fi.name), docsWithField);
         if (reader.getBinaryDocValues(fi.name) != null ||
             reader.getSortedDocValues(fi.name) != null ||
             reader.getSortedSetDocValues(fi.name) != null) {
@@ -1430,7 +1504,7 @@ public class CheckIndex {
   private static void checkNorms(FieldInfo fi, AtomicReader reader, PrintStream infoStream) throws IOException {
     switch(fi.getNormType()) {
       case NUMERIC:
-        checkNumericDocValues(fi.name, reader, reader.getNormValues(fi.name));
+        checkNumericDocValues(fi.name, reader, reader.getNormValues(fi.name), new Bits.MatchAllBits(reader.maxDoc()));
         break;
       default:
         throw new AssertionError("wtf: " + fi.getNormType());

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/CoalescedDeletes.java Mon Oct 21 18:58:24 2013
@@ -25,24 +25,32 @@ import java.util.Map;
 
 import org.apache.lucene.search.Query;
 import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
+import org.apache.lucene.util.MergedIterator;
 
 class CoalescedDeletes {
   final Map<Query,Integer> queries = new HashMap<Query,Integer>();
   final List<Iterable<Term>> iterables = new ArrayList<Iterable<Term>>();
-
+  final List<NumericUpdate> numericDVUpdates = new ArrayList<NumericUpdate>();
+  
   @Override
   public String toString() {
     // note: we could add/collect more debugging information
-    return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ")";
+    return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")";
   }
 
   void update(FrozenBufferedDeletes in) {
     iterables.add(in.termsIterable());
 
-    for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
+    for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
       final Query query = in.queries[queryIdx];
       queries.put(query, BufferedDeletes.MAX_INT);
     }
+    
+    for (NumericUpdate nu : in.updates) {
+      NumericUpdate clone = new NumericUpdate(nu.term, nu.field, nu.value);
+      clone.docIDUpto = Integer.MAX_VALUE;
+      numericDVUpdates.add(clone);
+    }
   }
 
  public Iterable<Term> termsIterable() {

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocFieldProcessor.java Mon Oct 21 18:58:24 2013
@@ -85,7 +85,7 @@ final class DocFieldProcessor extends Do
     // FreqProxTermsWriter does this with
     // FieldInfo.storePayload.
     FieldInfosWriter infosWriter = codec.fieldInfosFormat().getFieldInfosWriter();
-    infosWriter.write(state.directory, state.segmentInfo.name, state.fieldInfos, IOContext.DEFAULT);
+    infosWriter.write(state.directory, state.segmentInfo.name, "", state.fieldInfos, IOContext.DEFAULT);
   }
 
   @Override

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocInverterPerField.java Mon Oct 21 18:58:24 2013
@@ -92,13 +92,9 @@ final class DocInverterPerField extends 
           fieldState.position += analyzed ? docState.analyzer.getPositionIncrementGap(fieldInfo.name) : 0;
         }
 
-        final TokenStream stream = field.tokenStream(docState.analyzer);
-        // reset the TokenStream to the first token
-        stream.reset();
-
-        boolean success2 = false;
-
-        try {
+        try (TokenStream stream = field.tokenStream(docState.analyzer)) {
+          // reset the TokenStream to the first token
+          stream.reset();
           boolean hasMoreTokens = stream.incrementToken();
 
           fieldState.attributeSource = stream;
@@ -175,15 +171,10 @@ final class DocInverterPerField extends 
           }
           // trigger streams to perform end-of-stream operations
           stream.end();
-
+          // TODO: maybe add some safety? then again, its already checked 
+          // when we come back around to the field...
+          fieldState.position += posIncrAttribute.getPositionIncrement();
           fieldState.offset += offsetAttribute.endOffset();
-          success2 = true;
-        } finally {
-          if (!success2) {
-            IOUtils.closeWhileHandlingException(stream);
-          } else {
-            stream.close();
-          }
         }
 
         fieldState.offset += analyzed ? docState.analyzer.getOffsetGap(fieldInfo.name) : 0;

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocTermOrds.java Mon Oct 21 18:58:24 2013
@@ -20,7 +20,6 @@ package org.apache.lucene.index;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.lucene.codecs.PostingsFormat; // javadocs
@@ -611,11 +610,6 @@ public class DocTermOrds {
       termsEnum = reader.fields().terms(field).iterator(null);
     }
 
-    @Override
-    public Comparator<BytesRef> getComparator() {
-      return termsEnum.getComparator();
-    }
-
     @Override    
     public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
       return termsEnum.docs(liveDocs, reuse, flags);

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocValuesProcessor.java Mon Oct 21 18:58:24 2013
@@ -143,7 +143,7 @@ final class DocValuesProcessor extends S
     DocValuesWriter writer = writers.get(fieldInfo.name);
     NumericDocValuesWriter numericWriter;
     if (writer == null) {
-      numericWriter = new NumericDocValuesWriter(fieldInfo, bytesUsed);
+      numericWriter = new NumericDocValuesWriter(fieldInfo, bytesUsed, true);
       writers.put(fieldInfo.name, numericWriter);
     } else if (!(writer instanceof NumericDocValuesWriter)) {
       throw new IllegalArgumentException("Incompatible DocValues type: field \"" + fieldInfo.name + "\" changed from " + getTypeDesc(writer) + " to numeric");

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Oct 21 18:58:24 2013
@@ -19,18 +19,18 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
-import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.index.FieldInfos.FieldNumbers;
+import org.apache.lucene.index.IndexWriter.Event;
 import org.apache.lucene.search.Query;
-import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.InfoStream;
@@ -100,19 +100,15 @@ import org.apache.lucene.util.InfoStream
  */
 
 final class DocumentsWriter {
-  Directory directory;
+  private final Directory directory;
 
   private volatile boolean closed;
 
-  final InfoStream infoStream;
-  Similarity similarity;
+  private final InfoStream infoStream;
 
-  List<String> newFiles;
+  private final LiveIndexWriterConfig config;
 
-  final IndexWriter indexWriter;
-  final LiveIndexWriterConfig indexWriterConfig;
-
-  private AtomicInteger numDocsInRAM = new AtomicInteger(0);
+  private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
   // TODO: cut over to BytesRefHash in BufferedDeletes
   volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
@@ -125,73 +121,79 @@ final class DocumentsWriter {
    */
   private volatile boolean pendingChangesInCurrentFullFlush;
 
-  private Collection<String> abortedFiles;               // List of files that were written before last abort()
-
-  final IndexingChain chain;
-
   final DocumentsWriterPerThreadPool perThreadPool;
   final FlushPolicy flushPolicy;
   final DocumentsWriterFlushControl flushControl;
+  private final IndexWriter writer;
+  private final Queue<Event> events;
+
   
-  final Codec codec;
-  DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers,
-      BufferedDeletesStream bufferedDeletesStream) {
-    this.codec = codec;
+  DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) {
     this.directory = directory;
-    this.indexWriter = writer;
+    this.config = config;
     this.infoStream = config.getInfoStream();
-    this.similarity = config.getSimilarity();
-    this.indexWriterConfig = writer.getConfig();
     this.perThreadPool = config.getIndexerThreadPool();
-    this.chain = config.getIndexingChain();
-    this.perThreadPool.initialize(this, globalFieldNumbers, config);
     flushPolicy = config.getFlushPolicy();
-    assert flushPolicy != null;
-    flushPolicy.init(this);
-    flushControl = new DocumentsWriterFlushControl(this, config);
+    this.writer = writer;
+    this.events = new ConcurrentLinkedQueue<Event>();
+    flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedDeletesStream);
   }
-
-  synchronized void deleteQueries(final Query... queries) throws IOException {
+  
+  synchronized boolean deleteQueries(final Query... queries) throws IOException {
+    // TODO why is this synchronized?
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     deleteQueue.addDelete(queries);
     flushControl.doOnDelete();
-    if (flushControl.doApplyAllDeletes()) {
-      applyAllDeletes(deleteQueue);
-    }
+    return applyAllDeletes(deleteQueue);
   }
 
   // TODO: we could check w/ FreqProxTermsWriter: if the
   // term doesn't exist, don't bother buffering into the
   // per-DWPT map (but still must go into the global map)
-  synchronized void deleteTerms(final Term... terms) throws IOException {
+  synchronized boolean deleteTerms(final Term... terms) throws IOException {
+    // TODO why is this synchronized?
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     deleteQueue.addDelete(terms);
     flushControl.doOnDelete();
-    if (flushControl.doApplyAllDeletes()) {
-      applyAllDeletes(deleteQueue);
-    }
+    return applyAllDeletes( deleteQueue);
+  }
+
+  synchronized boolean updateNumericDocValue(Term term, String field, Long value) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value));
+    flushControl.doOnDelete();
+    return applyAllDeletes(deleteQueue);
   }
 
   DocumentsWriterDeleteQueue currentDeleteSession() {
     return deleteQueue;
   }
   
-  private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
-    if (deleteQueue != null && !flushControl.isFullFlush()) {
-      ticketQueue.addDeletesAndPurge(this, deleteQueue);
+  private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+    if (flushControl.getAndResetApplyAllDeletes()) {
+      if (deleteQueue != null && !flushControl.isFullFlush()) {
+        ticketQueue.addDeletes(deleteQueue);
+      }
+      putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
+      return true;
     }
-    indexWriter.applyAllDeletes();
-    indexWriter.flushCount.incrementAndGet();
+    return false;
   }
+  
+  final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
+    if (forced) {
+      return ticketQueue.forcePurge(writer);
+    } else {
+      return ticketQueue.tryPurge(writer);
+    }
+  }
+  
 
   /** Returns how many docs are currently buffered in RAM. */
   int getNumDocs() {
     return numDocsInRAM.get();
   }
 
-  Collection<String> abortedFiles() {
-    return abortedFiles;
-  }
-
   private void ensureOpen() throws AlreadyClosedException {
     if (closed) {
       throw new AlreadyClosedException("this IndexWriter is closed");
@@ -202,45 +204,37 @@ final class DocumentsWriter {
    *  updating the index files) and must discard all
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
-  synchronized void abort() {
+  synchronized void abort(IndexWriter writer) {
+    assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
     boolean success = false;
-
+    final Set<String> newFilesSet = new HashSet<String>();
     try {
       deleteQueue.clear();
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", "abort");
       }
-
       final int limit = perThreadPool.getActiveThreadState();
       for (int i = 0; i < limit; i++) {
         final ThreadState perThread = perThreadPool.getThreadState(i);
         perThread.lock();
         try {
-          if (perThread.isActive()) { // we might be closed
-            try {
-              perThread.dwpt.abort();
-            } finally {
-              perThread.dwpt.checkAndResetHasAborted();
-              flushControl.doOnAbort(perThread);
-            }
-          } else {
-            assert closed;
-          }
+          abortThreadState(perThread, newFilesSet);
         } finally {
           perThread.unlock();
         }
       }
-      flushControl.abortPendingFlushes();
+      flushControl.abortPendingFlushes(newFilesSet);
+      putEvent(new DeleteNewFilesEvent(newFilesSet));
       flushControl.waitForFlush();
       success = true;
     } finally {
       if (infoStream.isEnabled("DW")) {
-        infoStream.message("DW", "done abort; abortedFiles=" + abortedFiles + " success=" + success);
+        infoStream.message("DW", "done abort; abortedFiles=" + newFilesSet + " success=" + success);
       }
     }
   }
   
-  synchronized void lockAndAbortAll() {
+  synchronized void lockAndAbortAll(IndexWriter indexWriter) {
     assert indexWriter.holdsFullFlushLock();
     if (infoStream.isEnabled("DW")) {
       infoStream.message("DW", "lockAndAbortAll");
@@ -249,20 +243,15 @@ final class DocumentsWriter {
     try {
       deleteQueue.clear();
       final int limit = perThreadPool.getMaxThreadStates();
+      final Set<String> newFilesSet = new HashSet<String>();
       for (int i = 0; i < limit; i++) {
         final ThreadState perThread = perThreadPool.getThreadState(i);
         perThread.lock();
-        if (perThread.isActive()) { // we might be closed or 
-          try {
-            perThread.dwpt.abort();
-          } finally {
-            perThread.dwpt.checkAndResetHasAborted();
-            flushControl.doOnAbort(perThread);
-          }
-        }
+        abortThreadState(perThread, newFilesSet);
       }
       deleteQueue.clear();
-      flushControl.abortPendingFlushes();
+      flushControl.abortPendingFlushes(newFilesSet);
+      putEvent(new DeleteNewFilesEvent(newFilesSet));
       flushControl.waitForFlush();
       success = true;
     } finally {
@@ -271,12 +260,31 @@ final class DocumentsWriter {
       }
       if (!success) {
         // if something happens here we unlock all states again
-        unlockAllAfterAbortAll();
+        unlockAllAfterAbortAll(indexWriter);
+      }
+    }
+  }
+
+  private final void abortThreadState(final ThreadState perThread, Set<String> newFiles) {
+    assert perThread.isHeldByCurrentThread();
+    if (perThread.isActive()) { // we might be closed
+      if (perThread.isInitialized()) { 
+        try {
+          subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM());
+          perThread.dwpt.abort(newFiles);
+        } finally {
+          perThread.dwpt.checkAndResetHasAborted();
+          flushControl.doOnAbort(perThread);
+        }
+      } else {
+        flushControl.doOnAbort(perThread);
       }
+    } else {
+      assert closed;
     }
   }
   
-  final synchronized void unlockAllAfterAbortAll() {
+  final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
     assert indexWriter.holdsFullFlushLock();
     if (infoStream.isEnabled("DW")) {
       infoStream.message("DW", "unlockAll");
@@ -334,7 +342,7 @@ final class DocumentsWriter {
 
   private boolean preUpdate() throws IOException {
     ensureOpen();
-    boolean maybeMerge = false;
+    boolean hasEvents = false;
     if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
       // Help out flushing any queued DWPTs so we can un-stall:
       if (infoStream.isEnabled("DW")) {
@@ -345,7 +353,7 @@ final class DocumentsWriter {
         DocumentsWriterPerThread flushingDWPT;
         while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
           // Don't push the delete here since the update could fail!
-          maybeMerge |= doFlush(flushingDWPT);
+          hasEvents |= doFlush(flushingDWPT);
         }
   
         if (infoStream.isEnabled("DW")) {
@@ -361,28 +369,35 @@ final class DocumentsWriter {
         infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
       }
     }
-    return maybeMerge;
+    return hasEvents;
   }
 
-  private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
-    if (flushControl.doApplyAllDeletes()) {
-      applyAllDeletes(deleteQueue);
-    }
+  private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
+    hasEvents |= applyAllDeletes(deleteQueue);
     if (flushingDWPT != null) {
-      maybeMerge |= doFlush(flushingDWPT);
+      hasEvents |= doFlush(flushingDWPT);
     } else {
       final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
       if (nextPendingFlush != null) {
-        maybeMerge |= doFlush(nextPendingFlush);
+        hasEvents |= doFlush(nextPendingFlush);
       }
     }
 
-    return maybeMerge;
+    return hasEvents;
+  }
+  
+  private final void ensureInitialized(ThreadState state) {
+    if (state.isActive() && state.dwpt == null) {
+      final FieldInfos.Builder infos = new FieldInfos.Builder(
+          writer.globalFieldNumberMap);
+      state.dwpt = new DocumentsWriterPerThread(writer.newSegmentName(),
+          directory, config, infoStream, deleteQueue, infos);
+    }
   }
 
   boolean updateDocuments(final Iterable<? extends IndexDocument> docs, final Analyzer analyzer,
                           final Term delTerm) throws IOException {
-    boolean maybeMerge = preUpdate();
+    boolean hasEvents = preUpdate();
 
     final ThreadState perThread = flushControl.obtainAndLock();
     final DocumentsWriterPerThread flushingDWPT;
@@ -392,13 +407,19 @@ final class DocumentsWriter {
         ensureOpen();
         assert false: "perThread is not active but we are still open";
       }
-       
+      ensureInitialized(perThread);
+      assert perThread.isInitialized();
       final DocumentsWriterPerThread dwpt = perThread.dwpt;
+      final int dwptNumDocs = dwpt.getNumDocsInRAM();
       try {
         final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
         numDocsInRAM.addAndGet(docCount);
       } finally {
         if (dwpt.checkAndResetHasAborted()) {
+          if (!dwpt.pendingFilesToDelete().isEmpty()) {
+            putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
+          }
+          subtractFlushedNumDocs(dwptNumDocs);
           flushControl.doOnAbort(perThread);
         }
       }
@@ -408,31 +429,35 @@ final class DocumentsWriter {
       perThread.unlock();
     }
 
-    return postUpdate(flushingDWPT, maybeMerge);
+    return postUpdate(flushingDWPT, hasEvents);
   }
 
   boolean updateDocument(final IndexDocument doc, final Analyzer analyzer,
       final Term delTerm) throws IOException {
 
-    boolean maybeMerge = preUpdate();
+    boolean hasEvents = preUpdate();
 
     final ThreadState perThread = flushControl.obtainAndLock();
 
     final DocumentsWriterPerThread flushingDWPT;
-    
     try {
-
       if (!perThread.isActive()) {
         ensureOpen();
-        throw new IllegalStateException("perThread is not active but we are still open");
+        assert false: "perThread is not active but we are still open";
       }
-       
+      ensureInitialized(perThread);
+      assert perThread.isInitialized();
       final DocumentsWriterPerThread dwpt = perThread.dwpt;
+      final int dwptNumDocs = dwpt.getNumDocsInRAM();
       try {
         dwpt.updateDocument(doc, analyzer, delTerm); 
         numDocsInRAM.incrementAndGet();
       } finally {
         if (dwpt.checkAndResetHasAborted()) {
+          if (!dwpt.pendingFilesToDelete().isEmpty()) {
+            putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
+          }
+          subtractFlushedNumDocs(dwptNumDocs);
           flushControl.doOnAbort(perThread);
         }
       }
@@ -442,13 +467,13 @@ final class DocumentsWriter {
       perThread.unlock();
     }
 
-    return postUpdate(flushingDWPT, maybeMerge);
+    return postUpdate(flushingDWPT, hasEvents);
   }
 
   private  boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
-    boolean maybeMerge = false;
+    boolean hasEvents = false;
     while (flushingDWPT != null) {
-      maybeMerge = true;
+      hasEvents = true;
       boolean success = false;
       SegmentFlushTicket ticket = null;
       try {
@@ -474,9 +499,24 @@ final class DocumentsWriter {
           // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
           ticket = ticketQueue.addFlushTicket(flushingDWPT);
   
-          // flush concurrently without locking
-          final FlushedSegment newSegment = flushingDWPT.flush();
-          ticketQueue.addSegment(ticket, newSegment);
+          final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
+          boolean dwptSuccess = false;
+          try {
+            // flush concurrently without locking
+            final FlushedSegment newSegment = flushingDWPT.flush();
+            ticketQueue.addSegment(ticket, newSegment);
+            dwptSuccess = true;
+          } finally {
+            subtractFlushedNumDocs(flushingDocsInRam);
+            if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
+              putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
+              hasEvents = true;
+            }
+            if (!dwptSuccess) {
+              putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
+              hasEvents = true;
+            }
+          }
           // flush was successful once we reached this point - new seg. has been assigned to the ticket!
           success = true;
         } finally {
@@ -496,54 +536,38 @@ final class DocumentsWriter {
           // thread in innerPurge can't keep up with all
           // other threads flushing segments.  In this case
           // we forcefully stall the producers.
-          ticketQueue.forcePurge(this);
-        } else {
-          ticketQueue.tryPurge(this);
+          putEvent(ForcedPurgeEvent.INSTANCE);
+          break;
         }
-
       } finally {
         flushControl.doAfterFlush(flushingDWPT);
         flushingDWPT.checkAndResetHasAborted();
-        indexWriter.flushCount.incrementAndGet();
-        indexWriter.doAfterFlush();
       }
      
       flushingDWPT = flushControl.nextPendingFlush();
     }
-
+    if (hasEvents) {
+      putEvent(MergePendingEvent.INSTANCE);
+    }
     // If deletes alone are consuming > 1/2 our RAM
     // buffer, force them all to apply now. This is to
     // prevent too-frequent flushing of a long tail of
     // tiny segments:
-    final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+    final double ramBufferSizeMB = config.getRAMBufferSizeMB();
     if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
         flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
       }
-      applyAllDeletes(deleteQueue);
+      hasEvents = true;
+      if (!this.applyAllDeletes(deleteQueue)) {
+        putEvent(ApplyDeletesEvent.INSTANCE);
+      }
     }
 
-    return maybeMerge;
+    return hasEvents;
   }
   
-
-  void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
-      throws IOException {
-    // Finish the flushed segment and publish it to IndexWriter
-    if (newSegment == null) {
-      assert bufferedDeletes != null;
-      if (bufferedDeletes != null && bufferedDeletes.any()) {
-        indexWriter.publishFrozenDeletes(bufferedDeletes);
-        if (infoStream.isEnabled("DW")) {
-          infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
-        }
-      }
-    } else {
-      publishFlushedSegment(newSegment, bufferedDeletes);  
-    }
-  }
-
   final void subtractFlushedNumDocs(int numFlushed) {
     int oldValue = numDocsInRAM.get();
     while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
@@ -551,29 +575,6 @@ final class DocumentsWriter {
     }
   }
   
-  /**
-   * Publishes the flushed segment, segment private deletes (if any) and its
-   * associated global delete (if present) to IndexWriter.  The actual
-   * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
-   * delete generation is always GlobalPacket_deleteGeneration + 1
-   */
-  private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
-      throws IOException {
-    assert newSegment != null;
-    assert newSegment.segmentInfo != null;
-    final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
-    //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
-    if (infoStream.isEnabled("DW")) {
-      infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);  
-    }
-    
-    if (segmentDeletes != null && infoStream.isEnabled("DW")) {
-      infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
-    }
-    // now publish!
-    indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
-  }
-  
   // for asserts
   private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
 
@@ -588,7 +589,7 @@ final class DocumentsWriter {
    * two stage operation; the caller must ensure (in try/finally) that finishFlush
    * is called after this method, to release the flush lock in DWFlushControl
    */
-  final boolean flushAllThreads()
+  final boolean flushAllThreads(final IndexWriter indexWriter)
     throws IOException {
     final DocumentsWriterDeleteQueue flushingDeleteQueue;
     if (infoStream.isEnabled("DW")) {
@@ -620,10 +621,9 @@ final class DocumentsWriter {
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
         }
-        ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
-      } else {
-        ticketQueue.forcePurge(this);
-      }
+        ticketQueue.addDeletes(flushingDeleteQueue);
+      } 
+      ticketQueue.forcePurge(indexWriter);
       assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
     } finally {
       assert flushingDeleteQueue == currentFullFlushDelQueue;
@@ -641,11 +641,94 @@ final class DocumentsWriter {
         // Release the flush lock
         flushControl.finishFullFlush();
       } else {
-        flushControl.abortFullFlushes();
+        Set<String> newFilesSet = new HashSet<>();
+        flushControl.abortFullFlushes(newFilesSet);
+        putEvent(new DeleteNewFilesEvent(newFilesSet));
+
       }
     } finally {
       pendingChangesInCurrentFullFlush = false;
     }
     
   }
+
+  public LiveIndexWriterConfig getIndexWriterConfig() {
+    return config;
+  }
+  
+  private void putEvent(Event event) {
+    events.add(event);
+  }
+  
+  static final class ApplyDeletesEvent implements Event {
+    static final Event INSTANCE = new ApplyDeletesEvent();
+    private int instCount = 0;
+    private ApplyDeletesEvent() {
+      assert instCount == 0;
+      instCount++;
+    }
+    
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      writer.applyDeletesAndPurge(true); // we always purge!
+    }
+  }
+  
+  static final class MergePendingEvent implements Event {
+    static final Event INSTANCE = new MergePendingEvent();
+    private int instCount = 0; 
+    private MergePendingEvent() {
+      assert instCount == 0;
+      instCount++;
+    }
+   
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
+    }
+  }
+  
+  static final class ForcedPurgeEvent implements Event {
+    static final Event INSTANCE = new ForcedPurgeEvent();
+    private int instCount = 0;
+    private ForcedPurgeEvent() {
+      assert instCount == 0;
+      instCount++;
+    }
+    
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      writer.purge(true);
+    }
+  }
+  
+  static class FlushFailedEvent implements Event {
+    private final SegmentInfo info;
+    
+    public FlushFailedEvent(SegmentInfo info) {
+      this.info = info;
+    }
+    
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      writer.flushFailed(info);
+    }
+  }
+  
+  static class DeleteNewFilesEvent implements Event {
+    private final Collection<String>  files;
+    
+    public DeleteNewFilesEvent(Collection<String>  files) {
+      this.files = files;
+    }
+    
+    @Override
+    public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+      writer.deleteNewFiles(files);
+    }
+  }
+
+  public Queue<Event> eventQueue() {
+    return events;
+  }
 }

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java Mon Oct 21 18:58:24 2013
@@ -107,6 +107,11 @@ final class DocumentsWriterDeleteQueue {
     tryApplyGlobalSlice();
   }
 
+  void addNumericUpdate(NumericUpdate update) {
+    add(new NumericUpdateNode(update));
+    tryApplyGlobalSlice();
+  }
+
   /**
    * invariant for document update
    */
@@ -380,6 +385,22 @@ final class DocumentsWriterDeleteQueue {
     }
   }
 
+  private static final class NumericUpdateNode extends Node<NumericUpdate> {
+
+    NumericUpdateNode(NumericUpdate update) {
+      super(update);
+    }
+
+    @Override
+    void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
+      bufferedDeletes.addNumericUpdate(item, docIDUpto);
+    }
+
+    @Override
+    public String toString() {
+      return "update=" + item;
+    }
+  }
 
   private boolean forceApplyGlobalSlice() {
     globalBufferLock.lock();

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Mon Oct 21 18:58:24 2013
@@ -23,9 +23,11 @@ import java.util.List;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
@@ -66,14 +68,18 @@ final class DocumentsWriterFlushControl 
   private boolean closed = false;
   private final DocumentsWriter documentsWriter;
   private final LiveIndexWriterConfig config;
+  private final BufferedDeletesStream bufferedDeletesStream;
+  private final InfoStream infoStream;
 
-  DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
+  DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedDeletesStream bufferedDeletesStream) {
+    this.infoStream = config.getInfoStream();
     this.stallControl = new DocumentsWriterStallControl();
     this.perThreadPool = documentsWriter.perThreadPool;
     this.flushPolicy = documentsWriter.flushPolicy;
-    this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
     this.config = config;
+    this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
     this.documentsWriter = documentsWriter;
+    this.bufferedDeletesStream = bufferedDeletesStream;
   }
 
   public synchronized long activeBytes() {
@@ -240,7 +246,6 @@ final class DocumentsWriterFlushControl 
   }
   
   public synchronized void waitForFlush() {
-    assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
     while (flushingWriters.size() != 0) {
       try {
         this.wait();
@@ -277,7 +282,7 @@ final class DocumentsWriterFlushControl 
       }
       assert assertMemory();
       // Take it out of the loop this DWPT is stale
-      perThreadPool.replaceForFlush(state, closed);
+      perThreadPool.reset(state, closed);
     } finally {
       updateStallState();
     }
@@ -295,7 +300,7 @@ final class DocumentsWriterFlushControl 
       assert fullFlush : "can not block if fullFlush == false";
       final DocumentsWriterPerThread dwpt;
       final long bytes = perThread.bytesUsed;
-      dwpt = perThreadPool.replaceForFlush(perThread, closed);
+      dwpt = perThreadPool.reset(perThread, closed);
       numPending--;
       blockedFlushes.add(new BlockedFlush(dwpt, bytes));
     }finally {
@@ -311,12 +316,12 @@ final class DocumentsWriterFlushControl 
       // We are pending so all memory is already moved to flushBytes
       if (perThread.tryLock()) {
         try {
-          if (perThread.isActive()) {
+          if (perThread.isInitialized()) {
             assert perThread.isHeldByCurrentThread();
             final DocumentsWriterPerThread dwpt;
             final long bytes = perThread.bytesUsed; // do that before
                                                          // replace!
-            dwpt = perThreadPool.replaceForFlush(perThread, closed);
+            dwpt = perThreadPool.reset(perThread, closed);
             assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
             // Record the flushing DWPT to reduce flushBytes in doAfterFlush
             flushingWriters.put(dwpt, Long.valueOf(bytes));
@@ -413,18 +418,18 @@ final class DocumentsWriterFlushControl 
    * Returns the number of delete terms in the global pool
    */
   public int getNumGlobalTermDeletes() {
-    return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
+    return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms();
   }
   
   public long getDeleteBytesUsed() {
-    return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
+    return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed();
   }
 
   synchronized int numFlushingDWPT() {
     return flushingWriters.size();
   }
   
-  public boolean doApplyAllDeletes() {
+  public boolean getAndResetApplyAllDeletes() {
     return flushDeletes.getAndSet(false);
   }
 
@@ -441,7 +446,7 @@ final class DocumentsWriterFlushControl 
         .currentThread(), documentsWriter);
     boolean success = false;
     try {
-      if (perThread.isActive()
+      if (perThread.isInitialized()
           && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
         // There is a flush-all in process and this DWPT is
         // now stale -- enroll it for flush and try for
@@ -475,7 +480,10 @@ final class DocumentsWriterFlushControl 
       final ThreadState next = perThreadPool.getThreadState(i);
       next.lock();
       try {
-        if (!next.isActive()) {
+        if (!next.isInitialized()) {
+          if (closed && next.isActive()) {
+            perThreadPool.deactivateThreadState(next);
+          }
           continue; 
         }
         assert next.dwpt.deleteQueue == flushingQueue
@@ -515,7 +523,7 @@ final class DocumentsWriterFlushControl 
       final ThreadState next = perThreadPool.getThreadState(i);
       next.lock();
       try {
-        assert !next.isActive() || next.dwpt.deleteQueue == queue;
+        assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
       } finally {
         next.unlock();
       }
@@ -526,12 +534,12 @@ final class DocumentsWriterFlushControl 
   private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
 
   void addFlushableState(ThreadState perThread) {
-    if (documentsWriter.infoStream.isEnabled("DWFC")) {
-      documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
+    if (infoStream.isEnabled("DWFC")) {
+      infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
     }
     final DocumentsWriterPerThread dwpt = perThread.dwpt;
     assert perThread.isHeldByCurrentThread();
-    assert perThread.isActive();
+    assert perThread.isInitialized();
     assert fullFlush;
     assert dwpt.deleteQueue != documentsWriter.deleteQueue;
     if (dwpt.getNumDocsInRAM() > 0) {
@@ -545,11 +553,7 @@ final class DocumentsWriterFlushControl 
         fullFlushBuffer.add(flushingDWPT);
       }
     } else {
-      if (closed) {
-        perThreadPool.deactivateThreadState(perThread); // make this state inactive
-      } else {
-        perThreadPool.reinitThreadState(perThread);
-      }
+        perThreadPool.reset(perThread, closed); // make this state inactive
     }
   }
   
@@ -594,19 +598,20 @@ final class DocumentsWriterFlushControl 
     return true;
   }
 
-  synchronized void abortFullFlushes() {
+  synchronized void abortFullFlushes(Set<String> newFiles) {
    try {
-     abortPendingFlushes();
+     abortPendingFlushes(newFiles);
    } finally {
      fullFlush = false;
    }
   }
   
-  synchronized void abortPendingFlushes() {
+  synchronized void abortPendingFlushes(Set<String> newFiles) {
     try {
       for (DocumentsWriterPerThread dwpt : flushQueue) {
         try {
-          dwpt.abort();
+          documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM());
+          dwpt.abort(newFiles);
         } catch (Throwable ex) {
           // ignore - keep on aborting the flush queue
         } finally {
@@ -617,7 +622,8 @@ final class DocumentsWriterFlushControl 
         try {
           flushingWriters
               .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
-          blockedFlush.dwpt.abort();
+          documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
+          blockedFlush.dwpt.abort(newFiles);
         } catch (Throwable ex) {
           // ignore - keep on aborting the blocked queue
         } finally {
@@ -670,8 +676,8 @@ final class DocumentsWriterFlushControl 
    * checked out DWPT are available
    */
   void waitIfStalled() {
-    if (documentsWriter.infoStream.isEnabled("DWFC")) {
-      documentsWriter.infoStream.message("DWFC",
+    if (infoStream.isEnabled("DWFC")) {
+      infoStream.message("DWFC",
           "waitIfStalled: numFlushesPending: " + flushQueue.size()
               + " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
               + " fullFlush: " + fullFlush);
@@ -686,5 +692,12 @@ final class DocumentsWriterFlushControl 
     return stallControl.anyStalledThreads();
   }
   
+  /**
+   * Returns the {@link IndexWriter} {@link InfoStream}
+   */
+  public InfoStream getInfoStream() {
+    return infoStream;
+  }
+  
   
 }

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Mon Oct 21 18:58:24 2013
@@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue {
   private final AtomicInteger ticketCount = new AtomicInteger();
   private final ReentrantLock purgeLock = new ReentrantLock();
 
-  void addDeletesAndPurge(DocumentsWriter writer,
-      DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+  void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
     synchronized (this) {
       incTickets();// first inc the ticket count - freeze opens
                    // a window for #anyChanges to fail
@@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue {
         }
       }
     }
-    // don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks 
-    // if we hold the lock.
-    forcePurge(writer);
   }
   
   private void incTickets() {
@@ -98,8 +94,9 @@ class DocumentsWriterFlushQueue {
     return ticketCount.get() != 0;
   }
 
-  private void innerPurge(DocumentsWriter writer) throws IOException {
+  private int innerPurge(IndexWriter writer) throws IOException {
     assert purgeLock.isHeldByCurrentThread();
+    int numPurged = 0;
     while (true) {
       final FlushTicket head;
       final boolean canPublish;
@@ -108,6 +105,7 @@ class DocumentsWriterFlushQueue {
         canPublish = head != null && head.canPublish(); // do this synced 
       }
       if (canPublish) {
+        numPurged++;
         try {
           /*
            * if we block on publish -> lock IW -> lock BufferedDeletes we don't block
@@ -116,6 +114,7 @@ class DocumentsWriterFlushQueue {
            * be a ticket still in the queue. 
            */
           head.publish(writer);
+          
         } finally {
           synchronized (this) {
             // finally remove the published ticket from the queue
@@ -128,27 +127,31 @@ class DocumentsWriterFlushQueue {
         break;
       }
     }
+    return numPurged;
   }
 
-  void forcePurge(DocumentsWriter writer) throws IOException {
+  int forcePurge(IndexWriter writer) throws IOException {
     assert !Thread.holdsLock(this);
+    assert !Thread.holdsLock(writer);
     purgeLock.lock();
     try {
-      innerPurge(writer);
+      return innerPurge(writer);
     } finally {
       purgeLock.unlock();
     }
   }
 
-  void tryPurge(DocumentsWriter writer) throws IOException {
+  int tryPurge(IndexWriter writer) throws IOException {
     assert !Thread.holdsLock(this);
+    assert !Thread.holdsLock(writer);
     if (purgeLock.tryLock()) {
       try {
-        innerPurge(writer);
+        return innerPurge(writer);
       } finally {
         purgeLock.unlock();
       }
     }
+    return 0;
   }
 
   public int getTicketCount() {
@@ -169,8 +172,47 @@ class DocumentsWriterFlushQueue {
       this.frozenDeletes = frozenDeletes;
     }
 
-    protected abstract void publish(DocumentsWriter writer) throws IOException;
+    protected abstract void publish(IndexWriter writer) throws IOException;
     protected abstract boolean canPublish();
+    
+    /**
+     * Publishes the flushed segment, segment private deletes (if any) and its
+     * associated global delete (if present) to IndexWriter.  The actual
+     * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
+     * delete generation is always GlobalPacket_deleteGeneration + 1
+     */
+    protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
+        throws IOException {
+      assert newSegment != null;
+      assert newSegment.segmentInfo != null;
+      final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
+      //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
+      if (indexWriter.infoStream.isEnabled("DW")) {
+          indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);  
+      }
+      
+      if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) {
+          indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
+      }
+      // now publish!
+      indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
+    }
+    
+    protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+            throws IOException {
+        // Finish the flushed segment and publish it to IndexWriter
+        if (newSegment == null) {
+          assert bufferedDeletes != null;
+          if (bufferedDeletes != null && bufferedDeletes.any()) {
+            indexWriter.publishFrozenDeletes(bufferedDeletes);
+            if (indexWriter.infoStream.isEnabled("DW")) {
+                indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
+            }
+          }
+        } else {
+            publishFlushedSegment(indexWriter, newSegment, bufferedDeletes);  
+        }
+      }
   }
   
   static final class GlobalDeletesTicket extends FlushTicket {
@@ -179,11 +221,11 @@ class DocumentsWriterFlushQueue {
       super(frozenDeletes);
     }
     @Override
-    protected void publish(DocumentsWriter writer) throws IOException {
+    protected void publish(IndexWriter writer) throws IOException {
       assert !published : "ticket was already publised - can not publish twice";
       published = true;
       // its a global ticket - no segment to publish
-      writer.finishFlush(null, frozenDeletes);
+      finishFlush(writer, null, frozenDeletes);
     }
 
     @Override
@@ -201,10 +243,10 @@ class DocumentsWriterFlushQueue {
     }
     
     @Override
-    protected void publish(DocumentsWriter writer) throws IOException {
+    protected void publish(IndexWriter writer) throws IOException {
       assert !published : "ticket was already publised - can not publish twice";
       published = true;
-      writer.finishFlush(segment, frozenDeletes);
+      finishFlush(writer, segment, frozenDeletes);
     }
     
     protected void setSegment(FlushedSegment segment) {

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Mon Oct 21 18:58:24 2013
@@ -17,11 +17,14 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
+
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Locale;
+import java.util.Set;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
@@ -40,9 +43,6 @@ import org.apache.lucene.util.IntBlockPo
 import org.apache.lucene.util.MutableBits;
 import org.apache.lucene.util.RamUsageEstimator;
 
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
-
 class DocumentsWriterPerThread {
 
   /**
@@ -144,7 +144,7 @@ class DocumentsWriterPerThread {
    *  updating the index files) and must discard all
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
-  void abort() {
+  void abort(Set<String> createdFiles) {
     //System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
     hasAborted = aborting = true;
     try {
@@ -157,10 +157,7 @@ class DocumentsWriterPerThread {
       }
 
       pendingDeletes.clear();
-      deleteSlice = deleteQueue.newSlice();
-      // Reset all postings data
-      doAfterFlush();
-
+      createdFiles.addAll(directory.getCreatedFiles());
     } finally {
       aborting = false;
       if (infoStream.isEnabled("DWPT")) {
@@ -169,9 +166,7 @@ class DocumentsWriterPerThread {
     }
   }
   private final static boolean INFO_VERBOSE = false;
-  final DocumentsWriter parent;
   final Codec codec;
-  final IndexWriter writer;
   final TrackingDirectoryWrapper directory;
   final Directory directoryOrig;
   final DocState docState;
@@ -179,60 +174,62 @@ class DocumentsWriterPerThread {
   final Counter bytesUsed;
   
   SegmentWriteState flushState;
-  //Deletes for our still-in-RAM (to be flushed next) segment
-  BufferedDeletes pendingDeletes;  
-  SegmentInfo segmentInfo;     // Current segment we are working on
+  // Deletes for our still-in-RAM (to be flushed next) segment
+  final BufferedDeletes pendingDeletes;
+  private final SegmentInfo segmentInfo;     // Current segment we are working on
   boolean aborting = false;   // True if an abort is pending
   boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
 
   private FieldInfos.Builder fieldInfos;
   private final InfoStream infoStream;
   private int numDocsInRAM;
-  private int flushedDocCount;
-  DocumentsWriterDeleteQueue deleteQueue;
-  DeleteSlice deleteSlice;
+  final DocumentsWriterDeleteQueue deleteQueue;
+  private final DeleteSlice deleteSlice;
   private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
   final Allocator byteBlockAllocator;
   final IntBlockPool.Allocator intBlockAllocator;
   private final LiveIndexWriterConfig indexWriterConfig;
 
   
-  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
-      FieldInfos.Builder fieldInfos, IndexingChain indexingChain) {
+  public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
+      FieldInfos.Builder fieldInfos) {
     this.directoryOrig = directory;
     this.directory = new TrackingDirectoryWrapper(directory);
-    this.parent = parent;
     this.fieldInfos = fieldInfos;
-    this.writer = parent.indexWriter;
-    this.indexWriterConfig = parent.indexWriterConfig;
-    this.infoStream = parent.infoStream;
-    this.codec = parent.codec;
+    this.indexWriterConfig = indexWriterConfig;
+    this.infoStream = infoStream;
+    this.codec = indexWriterConfig.getCodec();
     this.docState = new DocState(this, infoStream);
-    this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
+    this.docState.similarity = indexWriterConfig.getSimilarity();
     bytesUsed = Counter.newCounter();
     byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
     pendingDeletes = new BufferedDeletes();
     intBlockAllocator = new IntBlockAllocator(bytesUsed);
-    initialize();
+    this.deleteQueue = deleteQueue;
+    assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
+    pendingDeletes.clear();
+    deleteSlice = deleteQueue.newSlice();
+   
+    segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1, false, codec, null);
+    assert numDocsInRAM == 0;
+    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
+      infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);  
+    }
     // this should be the last call in the ctor 
     // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
-    consumer = indexingChain.getChain(this);
-  }
-  
-  public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) {
-    this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain);
+    consumer = indexWriterConfig.getIndexingChain().getChain(this);
+
   }
   
-  void initialize() {
-    deleteQueue = parent.deleteQueue;
-    assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
-    pendingDeletes.clear();
-    deleteSlice = null;
-  }
-
   void setAborting() {
     aborting = true;
   }
+
+  boolean checkAndResetHasAborted() {
+    final boolean retval = hasAborted;
+    hasAborted = false;
+    return retval;
+  }
   
   final boolean testPoint(String message) {
     if (infoStream.isEnabled("TP")) {
@@ -240,12 +237,6 @@ class DocumentsWriterPerThread {
     }
     return true;
   }
-  
-  boolean checkAndResetHasAborted() {
-    final boolean retval = hasAborted;
-    hasAborted = false;
-    return retval;
-  }
 
   public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException {
     assert testPoint("DocumentsWriterPerThread addDocument start");
@@ -253,9 +244,6 @@ class DocumentsWriterPerThread {
     docState.doc = doc;
     docState.analyzer = analyzer;
     docState.docID = numDocsInRAM;
-    if (segmentInfo == null) {
-      initSegmentInfo();
-    }
     if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
       infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
     }
@@ -274,7 +262,7 @@ class DocumentsWriterPerThread {
           deleteDocID(docState.docID);
           numDocsInRAM++;
         } else {
-          abort();
+          abort(filesToDelete);
         }
       }
     }
@@ -284,29 +272,16 @@ class DocumentsWriterPerThread {
       success = true;
     } finally {
       if (!success) {
-        abort();
+        abort(filesToDelete);
       }
     }
     finishDocument(delTerm);
   }
 
-  private void initSegmentInfo() {
-    String segment = writer.newSegmentName();
-    segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1,
-                                  false, codec, null, null);
-    assert numDocsInRAM == 0;
-    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
-      infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);  
-    }
-  }
-  
   public int updateDocuments(Iterable<? extends IndexDocument> docs, Analyzer analyzer, Term delTerm) throws IOException {
     assert testPoint("DocumentsWriterPerThread addDocuments start");
     assert deleteQueue != null;
     docState.analyzer = analyzer;
-    if (segmentInfo == null) {
-      initSegmentInfo();
-    }
     if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
       infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
     }
@@ -331,7 +306,7 @@ class DocumentsWriterPerThread {
               // be called (because an exc is being thrown):
               numDocsInRAM++;
             } else {
-              abort();
+              abort(filesToDelete);
             }
           }
         }
@@ -341,7 +316,7 @@ class DocumentsWriterPerThread {
           success = true;
         } finally {
           if (!success) {
-            abort();
+            abort(filesToDelete);
           }
         }
 
@@ -384,21 +359,18 @@ class DocumentsWriterPerThread {
      * the updated slice we get from 1. holds all the deletes that have occurred
      * since we updated the slice the last time.
      */
-    if (deleteSlice == null) {
-      deleteSlice = deleteQueue.newSlice();
-      if (delTerm != null) {
-        deleteQueue.add(delTerm, deleteSlice);
-        deleteSlice.reset();
-      }
-      
-    } else {
-      if (delTerm != null) {
-        deleteQueue.add(delTerm, deleteSlice);
-        assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
-        deleteSlice.apply(pendingDeletes, numDocsInRAM);
-      } else if (deleteQueue.updateSlice(deleteSlice)) {
-        deleteSlice.apply(pendingDeletes, numDocsInRAM);
-      }
+    boolean applySlice = numDocsInRAM != 0;
+    if (delTerm != null) {
+      deleteQueue.add(delTerm, deleteSlice);
+      assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
+    } else  {
+      applySlice &= deleteQueue.updateSlice(deleteSlice);
+    }
+    
+    if (applySlice) {
+      deleteSlice.apply(pendingDeletes, numDocsInRAM);
+    } else { // if we don't need to apply we must reset!
+      deleteSlice.reset();
     }
     ++numDocsInRAM;
   }
@@ -434,14 +406,6 @@ class DocumentsWriterPerThread {
     return numDocsInRAM;
   }
 
-  /** Reset after a flush */
-  private void doAfterFlush() {
-    segmentInfo = null;
-    directory.getCreatedFiles().clear();
-    fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers);
-    parent.subtractFlushedNumDocs(numDocsInRAM);
-    numDocsInRAM = 0;
-  }
   
   /**
    * Prepares this DWPT for flushing. This method will freeze and return the
@@ -457,7 +421,7 @@ class DocumentsWriterPerThread {
       // apply all deletes before we flush and release the delete slice
       deleteSlice.apply(pendingDeletes, numDocsInRAM);
       assert deleteSlice.isEmpty();
-      deleteSlice = null;
+      deleteSlice.reset();
     }
     return globalDeletes;
   }
@@ -465,11 +429,11 @@ class DocumentsWriterPerThread {
   /** Flush all pending docs to a new segment */
   FlushedSegment flush() throws IOException {
     assert numDocsInRAM > 0;
-    assert deleteSlice == null : "all deletes must be applied in prepareFlush";
+    assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
     segmentInfo.setDocCount(numDocsInRAM);
-    flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
+    final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
         pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
-    final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.;
+    final double startMBUsed = bytesUsed() / 1024. / 1024.;
 
     // Apply delete-by-docID now (delete-byDocID only
     // happens when an exception is hit processing that
@@ -502,7 +466,7 @@ class DocumentsWriterPerThread {
       pendingDeletes.terms.clear();
       segmentInfo.setFiles(new HashSet<String>(directory.getCreatedFiles()));
 
-      final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L);
+      final SegmentInfoPerCommit segmentInfoPerCommit = new SegmentInfoPerCommit(segmentInfo, 0, -1L, -1L);
       if (infoStream.isEnabled("DWPT")) {
         infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : (flushState.segmentInfo.getDocCount() - flushState.delCountOnFlush)) + " deleted docs");
         infoStream.message("DWPT", "new segment has " +
@@ -515,15 +479,12 @@ class DocumentsWriterPerThread {
         infoStream.message("DWPT", "flushed codec=" + codec);
       }
 
-      flushedDocCount += flushState.segmentInfo.getDocCount();
-
       final BufferedDeletes segmentDeletes;
-      if (pendingDeletes.queries.isEmpty()) {
+      if (pendingDeletes.queries.isEmpty() && pendingDeletes.numericUpdates.isEmpty()) {
         pendingDeletes.clear();
         segmentDeletes = null;
       } else {
         segmentDeletes = pendingDeletes;
-        pendingDeletes = new BufferedDeletes();
       }
 
       if (infoStream.isEnabled("DWPT")) {
@@ -531,7 +492,7 @@ class DocumentsWriterPerThread {
         infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name + 
                 " ramUsed=" + nf.format(startMBUsed) + " MB" +
                 " newFlushedSize(includes docstores)=" + nf.format(newSegmentSize) + " MB" +
-                " docs/MB=" + nf.format(flushedDocCount / newSegmentSize));
+                " docs/MB=" + nf.format(flushState.segmentInfo.getDocCount() / newSegmentSize));
       }
 
       assert segmentInfo != null;
@@ -539,20 +500,21 @@ class DocumentsWriterPerThread {
       FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
                                              segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
       sealFlushedSegment(fs);
-      doAfterFlush();
       success = true;
 
       return fs;
     } finally {
       if (!success) {
-        if (segmentInfo != null) {
-          writer.flushFailed(segmentInfo);
-        }
-        abort();
+        abort(filesToDelete);
       }
     }
   }
   
+  private final Set<String> filesToDelete = new HashSet<String>(); 
+  
+  public Set<String> pendingFilesToDelete() {
+    return filesToDelete;
+  }
   /**
    * Seals the {@link SegmentInfo} for the new flushed segment and persists
    * the deleted documents {@link MutableBits}.
@@ -568,12 +530,10 @@ class DocumentsWriterPerThread {
 
     boolean success = false;
     try {
+      
       if (indexWriterConfig.getUseCompoundFile()) {
-
-        // Now build compound file
-        Collection<String> oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
+        filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
         newSegment.info.setUseCompoundFile(true);
-        writer.deleteNewFiles(oldFiles);
       }
 
       // Have codec write SegmentInfo.  Must do this after
@@ -615,10 +575,9 @@ class DocumentsWriterPerThread {
     } finally {
       if (!success) {
         if (infoStream.isEnabled("DWPT")) {
-          infoStream.message("DWPT", "hit exception " +
-              "reating compound file for newly flushed segment " + newSegment.info.name);
+          infoStream.message("DWPT",
+                             "hit exception creating compound file for newly flushed segment " + newSegment.info.name);
         }
-        writer.flushFailed(newSegment.info);
       }
     }
   }
@@ -671,4 +630,5 @@ class DocumentsWriterPerThread {
       + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM="
         + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
   }
+  
 }

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Mon Oct 21 18:58:24 2013
@@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadP
      * for indexing anymore.
      * @see #isActive()  
      */
-    private void resetWriter(DocumentsWriterPerThread dwpt) {
+  
+    private void deactivate() {
       assert this.isHeldByCurrentThread();
-      if (dwpt == null) {
-        isActive = false;
-      }
-      this.dwpt = dwpt;
+      isActive = false;
+      reset();
+    }
+    
+    private void reset() {
+      assert this.isHeldByCurrentThread();
+      this.dwpt = null;
       this.bytesUsed = 0;
       this.flushPending = false;
     }
@@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadP
       return isActive;
     }
     
+    boolean isInitialized() {
+      assert this.isHeldByCurrentThread();
+      return isActive() && dwpt != null;
+    }
+    
     /**
      * Returns the number of currently active bytes in this ThreadState's
      * {@link DocumentsWriterPerThread}
@@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadP
 
   private ThreadState[] threadStates;
   private volatile int numThreadStatesActive;
-  private SetOnce<FieldNumbers> globalFieldMap = new SetOnce<FieldNumbers>();
-  private SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
-  
+
   /**
    * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
    */
@@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadP
     }
     threadStates = new ThreadState[maxNumThreadStates];
     numThreadStatesActive = 0;
-  }
-
-  void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) {
-    this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
-    this.globalFieldMap.set(globalFieldMap);
     for (int i = 0; i < threadStates.length; i++) {
-      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap);
-      threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
+      threadStates[i] = new ThreadState(null);
     }
   }
 
@@ -158,9 +159,10 @@ abstract class DocumentsWriterPerThreadP
       // should not happen
       throw new RuntimeException(e);
     }
-    clone.documentsWriter = new SetOnce<DocumentsWriter>();
-    clone.globalFieldMap = new SetOnce<FieldNumbers>();
     clone.threadStates = new ThreadState[threadStates.length];
+    for (int i = 0; i < threadStates.length; i++) {
+      clone.threadStates[i] = new ThreadState(null);
+    }
     return clone;
   }
   
@@ -178,6 +180,7 @@ abstract class DocumentsWriterPerThreadP
   int getActiveThreadState() {
     return numThreadStatesActive;
   }
+  
 
   /**
    * Returns a new {@link ThreadState} iff any new state is available otherwise
@@ -198,8 +201,7 @@ abstract class DocumentsWriterPerThreadP
         if (threadState.isActive()) {
           // unreleased thread states are deactivated during DW#close()
           numThreadStatesActive++; // increment will publish the ThreadState
-          assert threadState.dwpt != null;
-          threadState.dwpt.initialize();
+          assert threadState.dwpt == null;
           unlock = false;
           return threadState;
         }
@@ -220,7 +222,7 @@ abstract class DocumentsWriterPerThreadP
     for (int i = numThreadStatesActive; i < threadStates.length; i++) {
       assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
       try {
-        assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
+        assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive";
       } finally {
         threadStates[i].unlock();
       }
@@ -236,24 +238,20 @@ abstract class DocumentsWriterPerThreadP
       final ThreadState threadState = threadStates[i];
       threadState.lock();
       try {
-        threadState.resetWriter(null);
+        threadState.deactivate();
       } finally {
         threadState.unlock();
       }
     }
   }
   
-  DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
+  DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
     assert threadState.isHeldByCurrentThread();
-    assert globalFieldMap.get() != null;
     final DocumentsWriterPerThread dwpt = threadState.dwpt;
     if (!closed) {
-      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get());
-      final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
-      newDwpt.initialize();
-      threadState.resetWriter(newDwpt);
+      threadState.reset();
     } else {
-      threadState.resetWriter(null);
+      threadState.deactivate();
     }
     return dwpt;
   }
@@ -328,18 +326,6 @@ abstract class DocumentsWriterPerThreadP
    */
   void deactivateThreadState(ThreadState threadState) {
     assert threadState.isActive();
-    threadState.resetWriter(null);
-  }
-
-  /**
-   * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
-   * only be reinitialized if it is active without any pending documents.
-   * 
-   * @param threadState the state to reinitialize
-   */
-  void reinitThreadState(ThreadState threadState) {
-    assert threadState.isActive;
-    assert threadState.dwpt.getNumDocsInRAM() == 0;
-    threadState.dwpt.initialize();
+    threadState.deactivate();
   }
 }

Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/FieldInfo.java Mon Oct 21 18:58:24 2013
@@ -47,6 +47,8 @@ public final class FieldInfo {
 
   private Map<String,String> attributes;
 
+  private long dvGen = -1; // the DocValues generation of this field
+  
   /**
    * Controls how much information is stored in the postings lists.
    * @lucene.experimental
@@ -79,7 +81,7 @@ public final class FieldInfo {
      * Character offsets are encoded alongside the positions. 
      */
     DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
-  };
+  }
   
   /**
    * DocValues types.
@@ -92,32 +94,34 @@ public final class FieldInfo {
      */
     NUMERIC,
     /**
-     * A per-document byte[].
+     * A per-document byte[].  Values may be larger than
+     * 32766 bytes, but different codecs may enforce their own limits.
      */
     BINARY,
     /** 
      * A pre-sorted byte[]. Fields with this type only store distinct byte values 
      * and store an additional offset pointer per document to dereference the shared 
      * byte[]. The stored byte[] is presorted and allows access via document id, 
-     * ordinal and by-value.
+     * ordinal and by-value.  Values must be <= 32766 bytes.
      */
     SORTED,
     /** 
      * A pre-sorted Set&lt;byte[]&gt;. Fields with this type only store distinct byte values 
      * and store additional offset pointers per document to dereference the shared 
      * byte[]s. The stored byte[] is presorted and allows access via document id, 
-     * ordinal and by-value.
+     * ordinal and by-value.  Values must be <= 32766 bytes.
      */
     SORTED_SET
-  };
+  }
 
   /**
    * Sole Constructor.
    *
    * @lucene.experimental
    */
-  public FieldInfo(String name, boolean indexed, int number, boolean storeTermVector, 
-            boolean omitNorms, boolean storePayloads, IndexOptions indexOptions, DocValuesType docValues, DocValuesType normsType, Map<String,String> attributes) {
+  public FieldInfo(String name, boolean indexed, int number, boolean storeTermVector, boolean omitNorms, 
+      boolean storePayloads, IndexOptions indexOptions, DocValuesType docValues, DocValuesType normsType, 
+      Map<String,String> attributes) {
     this.name = name;
     this.indexed = indexed;
     this.number = number;
@@ -222,6 +226,19 @@ public final class FieldInfo {
     return docValueType;
   }
   
+  /** Sets the docValues generation of this field. */
+  public void setDocValuesGen(long dvGen) {
+    this.dvGen = dvGen;
+  }
+  
+  /**
+   * Returns the docValues generation of this field, or -1 if no docValues
+   * updates exist for it.
+   */
+  public long getDocValuesGen() {
+    return dvGen;
+  }
+  
   /**
    * Returns {@link DocValuesType} of the norm. this may be null if the field has no norms.
    */