You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2011/05/14 15:51:59 UTC

svn commit: r1103112 [5/24] - in /lucene/dev/branches/flexscoring: ./ dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/ant/ dev-tools/idea/lucene/contrib/db/bdb-je/ dev-tools/idea/lucene/contrib/db/bdb/ dev-tools/idea/lucene/contr...

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Sat May 14 13:51:35 2011
@@ -19,8 +19,15 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
 
 
 /**
@@ -33,26 +40,39 @@ import java.util.HashMap;
 
 final class DocFieldProcessor extends DocConsumer {
 
-  final DocumentsWriter docWriter;
   final DocFieldConsumer consumer;
   final StoredFieldsWriter fieldsWriter;
 
-  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
-    this.docWriter = docWriter;
+  // Holds all fields seen in current doc
+  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+  int fieldCount;
+
+  // Hash table for all fields ever seen
+  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+  int hashMask = 1;
+  int totalFieldCount;
+
+  float docBoost;
+  int fieldGen;
+  final DocumentsWriterPerThread.DocState docState;
+
+  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+    this.docState = docWriter.docState;
     this.consumer = consumer;
     fieldsWriter = new StoredFieldsWriter(docWriter);
   }
 
   @Override
-  public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+  public void flush(SegmentWriteState state) throws IOException {
 
-    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
-    for ( DocConsumerPerThread thread : threads) {
-      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
-      childThreadsAndFields.put(perThread.consumer, perThread.fields());
+    Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Collection<DocFieldConsumerPerField> fields = fields();
+    for (DocFieldConsumerPerField f : fields) {
+      childFields.put(f.getFieldInfo(), f);
     }
+
     fieldsWriter.flush(state);
-    consumer.flush(childThreadsAndFields, state);
+    consumer.flush(childFields, state);
 
     // Important to save after asking consumer to flush so
     // consumer can alter the FieldInfo* if necessary.  EG,
@@ -64,8 +84,20 @@ final class DocFieldProcessor extends Do
 
   @Override
   public void abort() {
-    fieldsWriter.abort();
-    consumer.abort();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        final DocFieldProcessorPerField next = field.next;
+        field.abort();
+        field = next;
+      }
+    }
+
+    try {
+      fieldsWriter.abort();
+    } finally {
+      consumer.abort();
+    }
   }
 
   @Override
@@ -73,8 +105,160 @@ final class DocFieldProcessor extends Do
     return consumer.freeRAM();
   }
 
+  public Collection<DocFieldConsumerPerField> fields() {
+    Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        fields.add(field.consumer);
+        field = field.next;
+      }
+    }
+    assert fields.size() == totalFieldCount;
+    return fields;
+  }
+
+  /** In flush we reset the fieldHash to not maintain per-field state
+   *  across segments */
   @Override
-  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
-    return new DocFieldProcessorPerThread(threadState, this);
+  void doAfterFlush() {
+    fieldHash = new DocFieldProcessorPerField[2];
+    hashMask = 1;
+    totalFieldCount = 0;
+  }
+
+  private void rehash() {
+    final int newHashSize = (fieldHash.length*2);
+    assert newHashSize > fieldHash.length;
+
+    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+    // Rehash
+    int newHashMask = newHashSize-1;
+    for(int j=0;j<fieldHash.length;j++) {
+      DocFieldProcessorPerField fp0 = fieldHash[j];
+      while(fp0 != null) {
+        final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+        DocFieldProcessorPerField nextFP0 = fp0.next;
+        fp0.next = newHashArray[hashPos2];
+        newHashArray[hashPos2] = fp0;
+        fp0 = nextFP0;
+      }
+    }
+
+    fieldHash = newHashArray;
+    hashMask = newHashMask;
   }
+
+  @Override
+  public void processDocument(FieldInfos fieldInfos) throws IOException {
+
+    consumer.startDocument();
+    fieldsWriter.startDocument();
+
+    final Document doc = docState.doc;
+
+    fieldCount = 0;
+
+    final int thisFieldGen = fieldGen++;
+
+    final List<Fieldable> docFields = doc.getFields();
+    final int numDocFields = docFields.size();
+
+    // Absorb any new fields first seen in this document.
+    // Also absorb any changes to fields we had already
+    // seen before (eg suddenly turning on norms or
+    // vectors, etc.):
+
+    for(int i=0;i<numDocFields;i++) {
+      Fieldable field = docFields.get(i);
+      final String fieldName = field.name();
+
+      // Make sure we have a PerField allocated
+      final int hashPos = fieldName.hashCode() & hashMask;
+      DocFieldProcessorPerField fp = fieldHash[hashPos];
+      while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+        fp = fp.next;
+      }
+
+      if (fp == null) {
+
+        // TODO FI: we need to genericize the "flags" that a
+        // field holds, and, how these flags are merged; it
+        // needs to be more "pluggable" such that if I want
+        // to have a new "thing" my Fields can do, I can
+        // easily add it
+        FieldInfo fi = fieldInfos.addOrUpdate(fieldName, field.isIndexed(), field.isTermVectorStored(),
+                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+        fp = new DocFieldProcessorPerField(this, fi);
+        fp.next = fieldHash[hashPos];
+        fieldHash[hashPos] = fp;
+        totalFieldCount++;
+
+        if (totalFieldCount >= fieldHash.length/2)
+          rehash();
+      } else {
+        fieldInfos.addOrUpdate(fp.fieldInfo.name, field.isIndexed(), field.isTermVectorStored(),
+                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+      }
+
+      if (thisFieldGen != fp.lastGen) {
+
+        // First time we're seeing this field for this doc
+        fp.fieldCount = 0;
+
+        if (fieldCount == fields.length) {
+          final int newSize = fields.length*2;
+          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+          System.arraycopy(fields, 0, newArray, 0, fieldCount);
+          fields = newArray;
+        }
+
+        fields[fieldCount++] = fp;
+        fp.lastGen = thisFieldGen;
+      }
+
+      fp.addField(field);
+
+      if (field.isStored()) {
+        fieldsWriter.addField(field, fp.fieldInfo);
+      }
+    }
+
+    // If we are writing vectors then we must visit
+    // fields in sorted order so they are written in
+    // sorted order.  TODO: we actually only need to
+    // sort the subset of fields that have vectors
+    // enabled; we could save [small amount of] CPU
+    // here.
+    ArrayUtil.quickSort(fields, 0, fieldCount, fieldsComp);
+    for(int i=0;i<fieldCount;i++) {
+      final DocFieldProcessorPerField perField = fields[i];
+      perField.consumer.processFields(perField.fields, perField.fieldCount);
+    }
+
+    if (docState.maxTermPrefix != null && docState.infoStream != null) {
+      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
+      docState.maxTermPrefix = null;
+    }
+  }
+
+  private static final Comparator<DocFieldProcessorPerField> fieldsComp = new Comparator<DocFieldProcessorPerField>() {
+    public int compare(DocFieldProcessorPerField o1, DocFieldProcessorPerField o2) {
+      return o1.fieldInfo.name.compareTo(o2.fieldInfo.name);
+    }
+  };
+  
+  @Override
+  void finishDocument() throws IOException {
+    try {
+      fieldsWriter.finishDocument();
+    } finally {
+      consumer.finishDocument();
+    }
+  }
+
 }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Sat May 14 13:51:35 2011
@@ -18,6 +18,8 @@ package org.apache.lucene.index;
  */
 
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
 
 /**
  * Holds all per thread, per field state.
@@ -34,11 +36,22 @@ final class DocFieldProcessorPerField {
   int fieldCount;
   Fieldable[] fields = new Fieldable[1];
 
-  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
-    this.consumer = perThread.consumer.addField(fieldInfo);
+  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
     this.fieldInfo = fieldInfo;
   }
 
+  public void addField(Fieldable field) {
+    if (fieldCount == fields.length) {
+      int newSize = ArrayUtil.oversize(fieldCount + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+      Fieldable[] newArray = new Fieldable[newSize];
+      System.arraycopy(fields, 0, newArray, 0, fieldCount);
+      fields = newArray;
+    }
+
+    fields[fieldCount++] = field;
+  }
+
   public void abort() {
     consumer.abort();
   }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverter.java Sat May 14 13:51:35 2011
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-
 import java.util.Map;
 
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
 
 /** This is a DocFieldConsumer that inverts each field,
  *  separately, from a Document, and accepts a
@@ -34,42 +35,72 @@ final class DocInverter extends DocField
   final InvertedDocConsumer consumer;
   final InvertedDocEndConsumer endConsumer;
 
-  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+  final DocumentsWriterPerThread.DocState docState;
+
+  final FieldInvertState fieldState = new FieldInvertState();
+
+  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+
+  static class SingleTokenAttributeSource extends AttributeSource {
+    final CharTermAttribute termAttribute;
+    final OffsetAttribute offsetAttribute;
+
+    private SingleTokenAttributeSource() {
+      termAttribute = addAttribute(CharTermAttribute.class);
+      offsetAttribute = addAttribute(OffsetAttribute.class);
+    }
+
+    public void reinit(String stringValue, int startOffset,  int endOffset) {
+      termAttribute.setEmpty().append(stringValue);
+      offsetAttribute.setOffset(startOffset, endOffset);
+    }
+  }
+
+  // Used to read a string value for a field
+  final ReusableStringReader stringReader = new ReusableStringReader();
+
+  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+    this.docState = docState;
     this.consumer = consumer;
     this.endConsumer = endConsumer;
   }
 
   @Override
-  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
-    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
+  void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+    Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+    Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
 
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
+    }
 
-      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
+    consumer.flush(childFieldsToFlush, state);
+    endConsumer.flush(endChildFieldsToFlush, state);
+  }
 
-      Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
-      Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
-      for (final DocFieldConsumerPerField field: entry.getValue() ) {  
-        DocInverterPerField perField = (DocInverterPerField) field;
-        childFields.add(perField.consumer);
-        endChildFields.add(perField.endConsumer);
-      }
+  @Override
+  public void startDocument() throws IOException {
+    consumer.startDocument();
+    endConsumer.startDocument();
+  }
 
-      childThreadsAndFields.put(perThread.consumer, childFields);
-      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
-    }
-    
-    consumer.flush(childThreadsAndFields, state);
-    endConsumer.flush(endChildThreadsAndFields, state);
+  public void finishDocument() throws IOException {
+    // TODO: allow endConsumer.finishDocument to also return
+    // a DocWriter
+    endConsumer.finishDocument();
+    consumer.finishDocument();
   }
 
   @Override
   void abort() {
-    consumer.abort();
-    endConsumer.abort();
+    try {
+      consumer.abort();
+    } finally {
+      endConsumer.abort();
+    }
   }
 
   @Override
@@ -78,7 +109,8 @@ final class DocInverter extends DocField
   }
 
   @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
-    return new DocInverterPerThread(docFieldProcessorPerThread, this);
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocInverterPerField(this, fi);
   }
+
 }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Sat May 14 13:51:35 2011
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
 
 final class DocInverterPerField extends DocFieldConsumerPerField {
 
-  final private DocInverterPerThread perThread;
-  final private FieldInfo fieldInfo;
+  final private DocInverter parent;
+  final FieldInfo fieldInfo;
   final InvertedDocConsumerPerField consumer;
   final InvertedDocEndConsumerPerField endConsumer;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
 
-  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
-    this.perThread = perThread;
+  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
-    docState = perThread.docState;
-    fieldState = perThread.fieldState;
-    this.consumer = perThread.consumer.addField(this, fieldInfo);
-    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+    docState = parent.docState;
+    fieldState = parent.fieldState;
+    this.consumer = parent.consumer.addField(this, fieldInfo);
+    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
   }
 
   @Override
@@ -80,8 +80,8 @@ final class DocInverterPerField extends 
         if (!field.isTokenized()) {		  // un-tokenized field
           String stringValue = field.stringValue();
           final int valueLength = stringValue.length();
-          perThread.singleToken.reinit(stringValue, 0, valueLength);
-          fieldState.attributeSource = perThread.singleToken;
+          parent.singleToken.reinit(stringValue, 0, valueLength);
+          fieldState.attributeSource = parent.singleToken;
           consumer.start(field);
 
           boolean success = false;
@@ -89,8 +89,9 @@ final class DocInverterPerField extends 
             consumer.add();
             success = true;
           } finally {
-            if (!success)
+            if (!success) {
               docState.docWriter.setAborting();
+            }
           }
           fieldState.offset += valueLength;
           fieldState.length++;
@@ -114,8 +115,8 @@ final class DocInverterPerField extends 
               if (stringValue == null) {
                 throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
               }
-              perThread.stringReader.init(stringValue);
-              reader = perThread.stringReader;
+              parent.stringReader.init(stringValue);
+              reader = parent.stringReader;
             }
           
             // Tokenize field and add to postingTable
@@ -166,8 +167,9 @@ final class DocInverterPerField extends 
                 consumer.add();
                 success = true;
               } finally {
-                if (!success)
+                if (!success) {
                   docState.docWriter.setAborting();
+                }
               }
               fieldState.length++;
               fieldState.position++;
@@ -195,4 +197,9 @@ final class DocInverterPerField extends 
     consumer.finish();
     endConsumer.finish();
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return fieldInfo;
+  }
 }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Sat May 14 13:51:35 2011
@@ -19,36 +19,27 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.text.NumberFormat;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMFile;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.RamUsageEstimator;
-import org.apache.lucene.util.RecyclingByteBlockAllocator;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
 
 /**
  * This class accepts multiple added documents and directly
- * writes a single segment file.  It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
+ * writes segment files.
  *
  * Each added document is passed to the {@link DocConsumer},
  * which in turn processes the document and interacts with
@@ -111,266 +102,117 @@ import static org.apache.lucene.util.Byt
  */
 
 final class DocumentsWriter {
-  final AtomicLong bytesUsed = new AtomicLong(0);
-  IndexWriter writer;
   Directory directory;
 
-  String segment;                         // Current segment we are working on
-
-  private int nextDocID;                  // Next docID to be added
-  private int numDocs;                    // # of docs added, but not yet flushed
-
-  // Max # ThreadState instances; if there are more threads
-  // than this they share ThreadStates
-  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
-  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
-
-  boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean aborting;               // True if an abort is pending
+  private volatile boolean closed;
 
   PrintStream infoStream;
   SimilarityProvider similarityProvider;
 
-  // max # simultaneous threads; if there are more than
-  // this, they wait for others to finish first
-  private final int maxThreadStates;
-
-  // TODO: cutover to BytesRefHash
-  // Deletes for our still-in-RAM (to be flushed next) segment
-  private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
-  
-  static class DocState {
-    DocumentsWriter docWriter;
-    Analyzer analyzer;
-    PrintStream infoStream;
-    SimilarityProvider similarityProvider;
-    int docID;
-    Document doc;
-    String maxTermPrefix;
-
-    // Only called by asserts
-    public boolean testPoint(String name) {
-      return docWriter.writer.testPoint(name);
-    }
-
-    public void clear() {
-      // don't hold onto doc nor analyzer, in case it is
-      // largish:
-      doc = null;
-      analyzer = null;
-    }
-  }
-
-  /** Consumer returns this on each doc.  This holds any
-   *  state that must be flushed synchronized "in docID
-   *  order".  We gather these and flush them in order. */
-  abstract static class DocWriter {
-    DocWriter next;
-    int docID;
-    abstract void finish() throws IOException;
-    abstract void abort();
-    abstract long sizeInBytes();
+  List<String> newFiles;
 
-    void setNext(DocWriter next) {
-      this.next = next;
-    }
-  }
+  final IndexWriter indexWriter;
 
-  /**
-   * Create and return a new DocWriterBuffer.
-   */
-  PerDocBuffer newPerDocBuffer() {
-    return new PerDocBuffer();
-  }
-
-  /**
-   * RAMFile buffer for DocWriters.
-   */
-  class PerDocBuffer extends RAMFile {
-    
-    /**
-     * Allocate bytes used from shared pool.
-     */
-    @Override
-    protected byte[] newBuffer(int size) {
-      assert size == PER_DOC_BLOCK_SIZE;
-      return perDocAllocator.getByteBlock();
-    }
-    
-    /**
-     * Recycle the bytes used.
-     */
-    synchronized void recycle() {
-      if (buffers.size() > 0) {
-        setLength(0);
-        
-        // Recycle the blocks
-        perDocAllocator.recycleByteBlocks(buffers);
-        buffers.clear();
-        sizeInBytes = 0;
-        
-        assert numBuffers() == 0;
-      }
-    }
-  }
-  
-  /**
-   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
-   * which returns the DocConsumer that the DocumentsWriter calls to process the
-   * documents. 
-   */
-  abstract static class IndexingChain {
-    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
-  }
-  
-  static final IndexingChain defaultIndexingChain = new IndexingChain() {
+  private AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
-    @Override
-    DocConsumer getChain(DocumentsWriter documentsWriter) {
-      /*
-      This is the current indexing chain:
-
-      DocConsumer / DocConsumerPerThread
-        --> code: DocFieldProcessor / DocFieldProcessorPerThread
-          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
-            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
-              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
-                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
-                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
-                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
-                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
-                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
-              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
-    */
-
-    // Build up indexing chain:
-
-      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
-      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
-      /*
-       * nesting TermsHash instances here to allow the secondary (TermVectors) share the interned postings
-       * via a shared ByteBlockPool. See TermsHashPerField for details. 
-       */
-      final TermsHash termVectorsTermHash = new TermsHash(documentsWriter, false, termVectorsWriter, null);
-      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter, termVectorsTermHash);
-      final NormsWriter normsWriter = new NormsWriter();
-      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
-      return new DocFieldProcessor(documentsWriter, docInverter);
-    }
-  };
-
-  final DocConsumer consumer;
-
-  // How much RAM we can use before flushing.  This is 0 if
-  // we are flushing by doc count instead.
-
-  private final IndexWriterConfig config;
+  // TODO: cut over to BytesRefHash in BufferedDeletes
+  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
+  private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
 
-  private boolean closed;
-  private FieldInfos fieldInfos;
+  private Collection<String> abortedFiles;               // List of files that were written before last abort()
 
-  private final BufferedDeletesStream bufferedDeletesStream;
-  private final IndexWriter.FlushControl flushControl;
+  final IndexingChain chain;
 
-  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, IndexingChain indexingChain, FieldInfos fieldInfos,
+  final DocumentsWriterPerThreadPool perThreadPool;
+  final FlushPolicy flushPolicy;
+  final DocumentsWriterFlushControl flushControl;
+  final Healthiness healthiness;
+  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
       BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
-    this.writer = writer;
+    this.indexWriter = writer;
     this.similarityProvider = config.getSimilarityProvider();
-    this.maxThreadStates = config.getMaxThreadStates();
-    this.fieldInfos = fieldInfos;
-    this.bufferedDeletesStream = bufferedDeletesStream;
-    flushControl = writer.flushControl;
-    consumer = config.getIndexingChain().getChain(this);
-    this.config = config;
-  }
-
-  // Buffer a specific docID for deletion.  Currently only
-  // used when we hit a exception when adding a document
-  synchronized void deleteDocID(int docIDUpto) {
-    pendingDeletes.addDocID(docIDUpto);
-    // NOTE: we do not trigger flush here.  This is
-    // potentially a RAM leak, if you have an app that tries
-    // to add docs but every single doc always hits a
-    // non-aborting exception.  Allowing a flush here gets
-    // very messy because we are only invoked when handling
-    // exceptions so to do this properly, while handling an
-    // exception we'd have to go off and flush new deletes
-    // which is risky (likely would hit some other
-    // confounding exception).
-  }
-  
-  boolean deleteQueries(Query... queries) {
-    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
-    synchronized(this) {
-      for (Query query : queries) {
-        pendingDeletes.addQuery(query, numDocs);
-      }
-    }
-    return doFlush;
-  }
-  
-  boolean deleteQuery(Query query) { 
-    final boolean doFlush = flushControl.waitUpdate(0, 1);
-    synchronized(this) {
-      pendingDeletes.addQuery(query, numDocs);
+    this.perThreadPool = config.getIndexerThreadPool();
+    this.chain = config.getIndexingChain();
+    this.perThreadPool.initialize(this, globalFieldNumbers, config);
+    final FlushPolicy configuredPolicy = config.getFlushPolicy();
+    if (configuredPolicy == null) {
+      flushPolicy = new FlushByRamOrCountsPolicy();
+    } else {
+      flushPolicy = configuredPolicy;
     }
-    return doFlush;
+    flushPolicy.init(this);
+    
+    healthiness = new Healthiness();
+    final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
+    flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
   }
-  
-  boolean deleteTerms(Term... terms) {
-    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
-    synchronized(this) {
-      for (Term term : terms) {
-        pendingDeletes.addTerm(term, numDocs);
-      }
+
+  synchronized void deleteQueries(final Query... queries) throws IOException {
+    deleteQueue.addDelete(queries);
+    flushControl.doOnDelete();
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
     }
-    return doFlush;
   }
 
   // TODO: we could check w/ FreqProxTermsWriter: if the
   // term doesn't exist, don't bother buffering into the
   // per-DWPT map (but still must go into the global map)
-  boolean deleteTerm(Term term, boolean skipWait) {
-    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
-    synchronized(this) {
-      pendingDeletes.addTerm(term, numDocs);
+  synchronized void deleteTerms(final Term... terms) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addDelete(terms);
+    flushControl.doOnDelete();
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
     }
-    return doFlush;
   }
 
-  /** If non-null, various details of indexing are printed
-   *  here. */
+  DocumentsWriterDeleteQueue currentDeleteSession() {
+    return deleteQueue;
+  }
+  
+  private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+    if (deleteQueue != null) {
+      synchronized (ticketQueue) {
+        // Freeze and insert the delete flush ticket in the queue
+        ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
+        applyFlushTickets();
+      }
+    }
+    indexWriter.applyAllDeletes();
+    indexWriter.flushCount.incrementAndGet();
+  }
+
   synchronized void setInfoStream(PrintStream infoStream) {
     this.infoStream = infoStream;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].docState.infoStream = infoStream;
+    final Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
+    while (it.hasNext()) {
+      it.next().perThread.setInfoStream(infoStream);
     }
   }
 
-  /** Get current segment name we are writing. */
-  synchronized String getSegment() {
-    return segment;
+  /** Returns how many docs are currently buffered in RAM. */
+  int getNumDocs() {
+    return numDocsInRAM.get();
   }
 
-  /** Returns how many docs are currently buffered in RAM. */
-  synchronized int getNumDocs() {
-    return numDocs;
+  Collection<String> abortedFiles() {
+    return abortedFiles;
   }
 
-  void message(String message) {
+  // returns boolean for asserts
+  boolean message(String message) {
     if (infoStream != null) {
-      writer.message("DW: " + message);
+      indexWriter.message("DW: " + message);
     }
+    return true;
   }
 
-  synchronized void setAborting() {
-    if (infoStream != null) {
-      message("setAborting");
+  private void ensureOpen() throws AlreadyClosedException {
+    if (closed) {
+      throw new AlreadyClosedException("this IndexWriter is closed");
     }
-    aborting = true;
   }
 
   /** Called if we hit an exception at a bad time (when
@@ -378,816 +220,335 @@ final class DocumentsWriter {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   synchronized void abort() throws IOException {
-    if (infoStream != null) {
-      message("docWriter: abort");
-    }
-
     boolean success = false;
 
-    try {
-
-      // Forcefully remove waiting ThreadStates from line
-      waitQueue.abort();
-
-      // Wait for all other threads to finish with
-      // DocumentsWriter:
-      waitIdle();
+    synchronized (this) {
+      deleteQueue.clear();
+    }
 
+    try {
       if (infoStream != null) {
-        message("docWriter: abort waitIdle done");
+        message("docWriter: abort");
       }
 
-      assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
-
-      waitQueue.waitingBytes = 0;
-
-      pendingDeletes.clear();
+      final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
-      for (DocumentsWriterThreadState threadState : threadStates)
+      while (threadsIterator.hasNext()) {
+        ThreadState perThread = threadsIterator.next();
+        perThread.lock();
         try {
-          threadState.consumer.abort();
-        } catch (Throwable t) {
+          if (perThread.isActive()) { // we might be closed
+            perThread.perThread.abort();
+            perThread.perThread.checkAndResetHasAborted();
+          } else {
+            assert closed;
+          }
+        } finally {
+          perThread.unlock();
         }
-
-      try {
-        consumer.abort();
-      } catch (Throwable t) {
       }
 
-      // Reset all postings data
-      doAfterFlush();
       success = true;
     } finally {
-      aborting = false;
-      notifyAll();
       if (infoStream != null) {
-        message("docWriter: done abort; success=" + success);
+        message("docWriter: done abort; abortedFiles=" + abortedFiles + " success=" + success);
       }
     }
   }
 
-  /** Reset after a flush */
-  private void doAfterFlush() throws IOException {
-    // All ThreadStates should be idle when we are called
-    assert allThreadsIdle();
-    for (DocumentsWriterThreadState threadState : threadStates) {
-      threadState.consumer.doAfterFlush();
-    }
-
-    threadBindings.clear();
-    waitQueue.reset();
-    segment = null;
-    fieldInfos = new FieldInfos(fieldInfos);
-    numDocs = 0;
-    nextDocID = 0;
-    bufferIsFull = false;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].doAfterFlush();
-    }
+  boolean anyChanges() {
+    return numDocsInRAM.get() != 0 || anyDeletions();
   }
 
-  private synchronized boolean allThreadsIdle() {
-    for(int i=0;i<threadStates.length;i++) {
-      if (!threadStates[i].isIdle) {
-        return false;
-      }
-    }
-    return true;
+  public int getBufferedDeleteTermsSize() {
+    return deleteQueue.getBufferedDeleteTermsSize();
   }
 
-  synchronized boolean anyChanges() {
-    return numDocs != 0 || pendingDeletes.any();
-  }
-
-  // for testing
-  public BufferedDeletes getPendingDeletes() {
-    return pendingDeletes;
-  }
-
-  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
-    // Lock order: DW -> BD
-    final long delGen = bufferedDeletesStream.getNextGen();
-    if (pendingDeletes.any()) {
-      if (segmentInfos.size() > 0 || newSegment != null) {
-        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
-        if (infoStream != null) {
-          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
-        }
-        bufferedDeletesStream.push(packet);
-        if (infoStream != null) {
-          message("flush: delGen=" + packet.gen);
-        }
-        if (newSegment != null) {
-          newSegment.setBufferedDeletesGen(packet.gen);
-        }
-      } else {
-        if (infoStream != null) {
-          message("flush: drop buffered deletes: no segments");
-        }
-        // We can safely discard these deletes: since
-        // there are no segments, the deletions cannot
-        // affect anything.
-      }
-      pendingDeletes.clear();
-    } else if (newSegment != null) {
-      newSegment.setBufferedDeletesGen(delGen);
-    }
+  //for testing
+  public int getNumBufferedDeleteTerms() {
+    return deleteQueue.numGlobalTermDeletes();
   }
 
   public boolean anyDeletions() {
-    return pendingDeletes.any();
+    return deleteQueue.anyChanges();
   }
 
-  /** Flush all pending docs to a new segment */
-  // Lock order: IW -> DW
-  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
-
-    final long startTime = System.currentTimeMillis();
-
-    // We change writer's segmentInfos:
-    assert Thread.holdsLock(writer);
-
-    waitIdle();
+  void close() {
+    closed = true;
+    flushControl.setClosed();
+  }
 
-    if (numDocs == 0) {
-      // nothing to do!
-      if (infoStream != null) {
-        message("flush: no docs; skipping");
-      }
-      // Lock order: IW -> DW -> BD
-      pushDeletes(null, segmentInfos);
-      return null;
-    }
+  boolean updateDocument(final Document doc, final Analyzer analyzer,
+      final Term delTerm) throws CorruptIndexException, IOException {
+    ensureOpen();
+    boolean maybeMerge = false;
+    final boolean isUpdate = delTerm != null;
+    if (healthiness.anyStalledThreads()) {
 
-    if (aborting) {
+      // Help out flushing any pending DWPTs so we can un-stall:
       if (infoStream != null) {
-        message("flush: skip because aborting is set");
+        message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
       }
-      return null;
-    }
-
-    boolean success = false;
-
-    SegmentInfo newSegment;
-
-    try {
-      assert nextDocID == numDocs;
-      assert waitQueue.numWaiting == 0;
-      assert waitQueue.waitingBytes == 0;
 
-      if (infoStream != null) {
-        message("flush postings as segment " + segment + " numDocs=" + numDocs);
-      }
-      
-      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
-                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
-                                                                 fieldInfos.buildSegmentCodecs(true),
-                                                                 pendingDeletes);
-      // Apply delete-by-docID now (delete-byDocID only
-      // happens when an exception is hit processing that
-      // doc, eg if analyzer has some problem w/ the text):
-      if (pendingDeletes.docIDs.size() > 0) {
-        flushState.deletedDocs = new BitVector(numDocs);
-        for(int delDocID : pendingDeletes.docIDs) {
-          flushState.deletedDocs.set(delDocID);
+      // Try pick up pending threads here if possible
+      DocumentsWriterPerThread flushingDWPT;
+      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+        // Don't push the delete here since the update could fail!
+        maybeMerge = doFlush(flushingDWPT);
+        if (!healthiness.anyStalledThreads()) {
+          break;
         }
-        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
-        pendingDeletes.docIDs.clear();
       }
 
-      newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false, fieldInfos);
-
-      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
-      for (DocumentsWriterThreadState threadState : threadStates) {
-        threads.add(threadState.consumer);
+      if (infoStream != null && healthiness.anyStalledThreads()) {
+        message("WARNING DocumentsWriter still has stalled threads; waiting");
       }
 
-      double startMBUsed = bytesUsed()/1024./1024.;
+      healthiness.waitIfStalled(); // block if stalled
 
-      consumer.flush(threads, flushState);
-
-      newSegment.setHasVectors(flushState.hasVectors);
-
-      if (infoStream != null) {
-        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
-        if (flushState.deletedDocs != null) {
-          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
-        }
-        message("flushedFiles=" + newSegment.files());
-        message("flushed codecs=" + newSegment.getSegmentCodecs());
+      if (infoStream != null && healthiness.anyStalledThreads()) {
+        message("WARNING DocumentsWriter done waiting");
       }
+    }
 
-      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
-        final String cfsFileName = IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
-
-        if (infoStream != null) {
-          message("flush: create compound file \"" + cfsFileName + "\"");
-        }
+    final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+        this, doc);
+    final DocumentsWriterPerThread flushingDWPT;
+    
+    try {
 
-        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
-        for(String fileName : newSegment.files()) {
-          cfsWriter.addFile(fileName);
-        }
-        cfsWriter.close();
-        deleter.deleteNewFiles(newSegment.files());
-        newSegment.setUseCompoundFile(true);
+      if (!perThread.isActive()) {
+        ensureOpen();
+        assert false: "perThread is not active but we are still open";
       }
-
-      // Must write deleted docs after the CFS so we don't
-      // slurp the del file into CFS:
-      if (flushState.deletedDocs != null) {
-        final int delCount = flushState.deletedDocs.count();
-        assert delCount > 0;
-        newSegment.setDelCount(delCount);
-        newSegment.advanceDelGen();
-        final String delFileName = newSegment.getDelFileName();
-        if (infoStream != null) {
-          message("flush: write " + delCount + " deletes to " + delFileName);
-        }
-        boolean success2 = false;
-        try {
-          // TODO: in the NRT case it'd be better to hand
-          // this del vector over to the
-          // shortly-to-be-opened SegmentReader and let it
-          // carry the changes; there's no reason to use
-          // filesystem as intermediary here.
-          flushState.deletedDocs.write(directory, delFileName);
-          success2 = true;
-        } finally {
-          if (!success2) {
-            try {
-              directory.deleteFile(delFileName);
-            } catch (Throwable t) {
-              // suppress this so we keep throwing the
-              // original exception
-            }
-          }
+       
+      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      try {
+        dwpt.updateDocument(doc, analyzer, delTerm); 
+        numDocsInRAM.incrementAndGet();
+      } finally {
+        if (dwpt.checkAndResetHasAborted()) {
+          flushControl.doOnAbort(perThread);
         }
       }
-
-      if (infoStream != null) {
-        message("flush: segment=" + newSegment);
-        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
-        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
-        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
-                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
-                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
-                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
-                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
-      }
-
-      success = true;
+      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
     } finally {
-      notifyAll();
-      if (!success) {
-        if (segment != null) {
-          deleter.refresh(segment);
-        }
-        abort();
-      }
+      perThread.unlock();
     }
-
-    doAfterFlush();
-
-    // Lock order: IW -> DW -> BD
-    pushDeletes(newSegment, segmentInfos);
-    if (infoStream != null) {
-      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
-    }
-
-    return newSegment;
-  }
-
-  synchronized void close() {
-    closed = true;
-    notifyAll();
-  }
-
-  /** Returns a free (idle) ThreadState that may be used for
-   * indexing this one document.  This call also pauses if a
-   * flush is pending.  If delTerm is non-null then we
-   * buffer this deleted term after the thread state has
-   * been acquired. */
-  synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
-
-    final Thread currentThread = Thread.currentThread();
-    assert !Thread.holdsLock(writer);
-
-    // First, find a thread state.  If this thread already
-    // has affinity to a specific ThreadState, use that one
-    // again.
-    DocumentsWriterThreadState state = threadBindings.get(currentThread);
-    if (state == null) {
-
-      // First time this thread has called us since last
-      // flush.  Find the least loaded thread state:
-      DocumentsWriterThreadState minThreadState = null;
-      for(int i=0;i<threadStates.length;i++) {
-        DocumentsWriterThreadState ts = threadStates[i];
-        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
-          minThreadState = ts;
-        }
-      }
-      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
-        state = minThreadState;
-        state.numThreads++;
-      } else {
-        // Just create a new "private" thread state
-        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
-        if (threadStates.length > 0) {
-          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
-        }
-        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
-        threadStates = newArray;
+    
+    if (flushingDWPT != null) {
+      maybeMerge |= doFlush(flushingDWPT);
+    } else {
+      final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
+      if (nextPendingFlush != null) {
+        maybeMerge |= doFlush(nextPendingFlush);
       }
-      threadBindings.put(currentThread, state);
     }
-
-    // Next, wait until my thread state is idle (in case
-    // it's shared with other threads), and no flush/abort
-    // pending 
-    waitReady(state);
-
-    // Allocate segment name if this is the first doc since
-    // last flush:
-    if (segment == null) {
-      segment = writer.newSegmentName();
-      assert numDocs == 0;
-    }
-
-    state.docState.docID = nextDocID++;
-
-    if (delTerm != null) {
-      pendingDeletes.addTerm(delTerm, state.docState.docID);
-    }
-
-    numDocs++;
-    state.isIdle = false;
-    return state;
-  }
-  
-  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
-    return updateDocument(doc, analyzer, null);
+    return maybeMerge;
   }
-  
-  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
-    throws CorruptIndexException, IOException {
 
-    // Possibly trigger a flush, or wait until any running flush completes:
-    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
-
-    // This call is synchronized but fast
-    final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
-
-    final DocState docState = state.docState;
-    docState.doc = doc;
-    docState.analyzer = analyzer;
-
-    boolean success = false;
-    try {
-      // This call is not synchronized and does all the
-      // work
-      final DocWriter perDoc;
+  private  boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+    boolean maybeMerge = false;
+    while (flushingDWPT != null) {
+      maybeMerge = true;
+      boolean success = false;
+      FlushTicket ticket = null;
+      
       try {
-        perDoc = state.consumer.processDocument(fieldInfos);
-      } finally {
-        docState.clear();
-      }
-
-      // This call is synchronized but fast
-      finishDocument(state, perDoc);
-
-      success = true;
-    } finally {
-      if (!success) {
-
-        // If this thread state had decided to flush, we
-        // must clear it so another thread can flush
-        if (doFlush) {
-          flushControl.clearFlushPending();
-        }
-
-        if (infoStream != null) {
-          message("exception in updateDocument aborting=" + aborting);
-        }
-
-        synchronized(this) {
-
-          state.isIdle = true;
-          notifyAll();
-            
-          if (aborting) {
-            abort();
-          } else {
-            skipDocWriter.docID = docState.docID;
-            boolean success2 = false;
-            try {
-              waitQueue.add(skipDocWriter);
-              success2 = true;
-            } finally {
-              if (!success2) {
-                abort();
-                return false;
-              }
+        assert currentFullFlushDelQueue == null
+            || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+            + currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+            + " " + flushControl.isFullFlush();
+        /*
+         * Since with DWPT the flush process is concurrent and several DWPT
+         * could flush at the same time we must maintain the order of the
+         * flushes before we can apply the flushed segment and the frozen global
+         * deletes it is buffering. The reason for this is that the global
+         * deletes mark a certain point in time where we took a DWPT out of
+         * rotation and freeze the global deletes.
+         * 
+         * Example: A flush 'A' starts and freezes the global deletes, then
+         * flush 'B' starts and freezes all deletes occurred since 'A' has
+         * started. if 'B' finishes before 'A' we need to wait until 'A' is done
+         * otherwise the deletes frozen by 'B' are not applied to 'A' and we
+         * might miss to deletes documents in 'A'.
+         */
+        try {
+          synchronized (ticketQueue) {
+            // Each flush is assigned a ticket in the order they accquire the ticketQueue lock
+            ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
+            ticketQueue.add(ticket);
+          }
+  
+          // flush concurrently without locking
+          final FlushedSegment newSegment = flushingDWPT.flush();
+          synchronized (ticketQueue) {
+            ticket.segment = newSegment;
+          }
+          // flush was successful once we reached this point - new seg. has been assigned to the ticket!
+          success = true;
+        } finally {
+          if (!success && ticket != null) {
+            synchronized (ticketQueue) {
+              // In the case of a failure make sure we are making progress and
+              // apply all the deletes since the segment flush failed since the flush
+              // ticket could hold global deletes see FlushTicket#canPublish()
+              ticket.isSegmentFlush = false;
             }
-
-            // Immediately mark this document as deleted
-            // since likely it was partially added.  This
-            // keeps indexing as "all or none" (atomic) when
-            // adding a document:
-            deleteDocID(state.docState.docID);
           }
         }
+        /*
+         * Now we are done and try to flush the ticket queue if the head of the
+         * queue has already finished the flush.
+         */
+        applyFlushTickets();
+      } finally {
+        flushControl.doAfterFlush(flushingDWPT);
+        flushingDWPT.checkAndResetHasAborted();
+        indexWriter.flushCount.incrementAndGet();
       }
+     
+      flushingDWPT = flushControl.nextPendingFlush();
     }
-
-    doFlush |= flushControl.flushByRAMUsage("new document");
-
-    return doFlush;
-  }
-
-  public synchronized void waitIdle() {
-    while (!allThreadsIdle()) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    }
+    return maybeMerge;
   }
 
-  synchronized void waitReady(DocumentsWriterThreadState state) {
-    while (!closed && (!state.isIdle || aborting)) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
+  private void applyFlushTickets() throws IOException {
+    synchronized (ticketQueue) {
+      while (true) {
+        // Keep publishing eligible flushed segments:
+        final FlushTicket head = ticketQueue.peek();
+        if (head != null && head.canPublish()) {
+          ticketQueue.poll();
+          finishFlush(head.segment, head.frozenDeletes);
+        } else {
+          break;
+        }
       }
     }
-
-    if (closed) {
-      throw new AlreadyClosedException("this IndexWriter is closed");
-    }
   }
 
-  /** Does the synchronized work to finish/flush the
-   *  inverted document. */
-  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
-
-    // Must call this w/o holding synchronized(this) else
-    // we'll hit deadlock:
-    balanceRAM();
-
-    synchronized(this) {
-
-      assert docWriter == null || docWriter.docID == perThread.docState.docID;
-
-      if (aborting) {
-
-        // We are currently aborting, and another thread is
-        // waiting for me to become idle.  We just forcefully
-        // idle this threadState; it will be fully reset by
-        // abort()
-        if (docWriter != null) {
-          try {
-            docWriter.abort();
-          } catch (Throwable t) {
-          }
+  private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+      throws IOException {
+    // Finish the flushed segment and publish it to IndexWriter
+    if (newSegment == null) {
+      assert bufferedDeletes != null;
+      if (bufferedDeletes != null && bufferedDeletes.any()) {
+        indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+        if (infoStream != null) {
+          message("flush: push buffered deletes: " + bufferedDeletes);
         }
-
-        perThread.isIdle = true;
-
-        // wakes up any threads waiting on the wait queue
-        notifyAll();
-
-        return;
       }
-
-      final boolean doPause;
-
-      if (docWriter != null) {
-        doPause = waitQueue.add(docWriter);
-      } else {
-        skipDocWriter.docID = perThread.docState.docID;
-        doPause = waitQueue.add(skipDocWriter);
-      }
-
-      if (doPause) {
-        waitForWaitQueue();
-      }
-
-      perThread.isIdle = true;
-
-      // wakes up any threads waiting on the wait queue
-      notifyAll();
+    } else {
+      publishFlushedSegment(newSegment, bufferedDeletes);  
     }
   }
 
-  synchronized void waitForWaitQueue() {
-    do {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    } while (!waitQueue.doResume());
-  }
-
-  private static class SkipDocWriter extends DocWriter {
-    @Override
-    void finish() {
-    }
-    @Override
-    void abort() {
-    }
-    @Override
-    long sizeInBytes() {
-      return 0;
+  final void subtractFlushedNumDocs(int numFlushed) {
+    int oldValue = numDocsInRAM.get();
+    while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
+      oldValue = numDocsInRAM.get();
     }
   }
-  final SkipDocWriter skipDocWriter = new SkipDocWriter();
-
-  NumberFormat nf = NumberFormat.getInstance();
-
-  /* Initial chunks size of the shared byte[] blocks used to
-     store postings data */
-  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
-
-  /* if you increase this, you must fix field cache impl for
-   * getTerms/getTermsIndex requires <= 32768. */
-  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
-
-  /* Initial chunks size of the shared int[] blocks used to
-     store postings data */
-  final static int INT_BLOCK_SHIFT = 13;
-  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
-  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
-  private List<int[]> freeIntBlocks = new ArrayList<int[]>();
-
-  /* Allocate another int[] from the shared pool */
-  synchronized int[] getIntBlock() {
-    final int size = freeIntBlocks.size();
-    final int[] b;
-    if (0 == size) {
-      b = new int[INT_BLOCK_SIZE];
-      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
-    } else {
-      b = freeIntBlocks.remove(size-1);
+  
+  /**
+   * Publishes the flushed segment, segment private deletes (if any) and its
+   * associated global delete (if present) to IndexWriter.  The actual
+   * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
+   * delete generation is always GlobalPacket_deleteGeneration + 1
+   */
+  private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
+      throws IOException {
+    assert newSegment != null;
+    final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
+    final BufferedDeletes deletes = newSegment.segmentDeletes;
+    FrozenBufferedDeletes packet = null;
+    if (deletes != null && deletes.any()) {
+      // Segment private delete
+      packet = new FrozenBufferedDeletes(deletes, true);
+      if (infoStream != null) {
+        message("flush: push buffered seg private deletes: " + packet);
+      }
     }
-    return b;
-  }
-
-  long bytesUsed() {
-    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
-  }
 
-  /* Return int[]s to the pool */
-  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
-    for(int i=start;i<end;i++) {
-      freeIntBlocks.add(blocks[i]);
-      blocks[i] = null;
-    }
+    // now publish!
+    indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
   }
-
-  final RecyclingByteBlockAllocator byteBlockAllocator = new RecyclingByteBlockAllocator(BYTE_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
-  final static int PER_DOC_BLOCK_SIZE = 1024;
-
-  final RecyclingByteBlockAllocator perDocAllocator = new RecyclingByteBlockAllocator(PER_DOC_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
-  String toMB(long v) {
-    return nf.format(v/1024./1024.);
+  
+  // for asserts
+  private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+  // for asserts
+  private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
+    currentFullFlushDelQueue = session;
+    return true;
   }
-
-  /* We have three pools of RAM: Postings, byte blocks
-   * (holds freq/prox posting data) and per-doc buffers
-   * (stored fields/term vectors).  Different docs require
-   * varying amount of storage from these classes.  For
-   * example, docs with many unique single-occurrence short
-   * terms will use up the Postings RAM and hardly any of
-   * the other two.  Whereas docs with very large terms will
-   * use alot of byte blocks RAM.  This method just frees
-   * allocations from the pools once we are over-budget,
-   * which balances the pools to match the current docs. */
-  void balanceRAM() {
-
-    final boolean doBalance;
-    final long deletesRAMUsed;
-
-    deletesRAMUsed = bufferedDeletesStream.bytesUsed();
-
-    final long ramBufferSize;
-    final double mb = config.getRAMBufferSizeMB();
-    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
-    } else {
-      ramBufferSize = (long) (mb*1024*1024);
+  
+  /*
+   * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
+   * two stage operation; the caller must ensure (in try/finally) that finishFlush
+   * is called after this method, to release the flush lock in DWFlushControl
+   */
+  final boolean flushAllThreads()
+    throws IOException {
+    final DocumentsWriterDeleteQueue flushingDeleteQueue;
+
+    synchronized (this) {
+      flushingDeleteQueue = deleteQueue;
+      /* Cutover to a new delete queue.  This must be synced on the flush control
+       * otherwise a new DWPT could sneak into the loop with an already flushing
+       * delete queue */
+      flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+      assert setFlushingDeleteQueue(flushingDeleteQueue);
     }
-
-    synchronized(this) {
-      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
-        return;
-      }
+    assert currentFullFlushDelQueue != null;
+    assert currentFullFlushDelQueue != deleteQueue;
     
-      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
-    }
-
-    if (doBalance) {
-
-      if (infoStream != null) {
-        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
-                " vs trigger=" + toMB(ramBufferSize) +
-                " deletesMB=" + toMB(deletesRAMUsed) +
-                " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
-                " perDocFree=" + toMB(perDocAllocator.bytesUsed()));
-      }
-
-      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-
-      int iter = 0;
-
-      // We free equally from each pool in 32 KB
-      // chunks until we are below our threshold
-      // (freeLevel)
-
-      boolean any = true;
-
-      final long freeLevel = (long) (0.95 * ramBufferSize);
-
-      while(bytesUsed()+deletesRAMUsed > freeLevel) {
-      
-        synchronized(this) {
-          if (0 == perDocAllocator.numBufferedBlocks() &&
-              0 == byteBlockAllocator.numBufferedBlocks() &&
-              0 == freeIntBlocks.size() && !any) {
-            // Nothing else to free -- must flush now.
-            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
-            if (infoStream != null) {
-              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
-                message("    nothing to free; set bufferIsFull");
-              } else {
-                message("    nothing to free");
-              }
-            }
-            break;
-          }
-
-          if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) {
-            byteBlockAllocator.freeBlocks(1);
-          }
-          if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
-            freeIntBlocks.remove(freeIntBlocks.size()-1);
-            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
-          }
-          if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
-            perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
-          }
-        }
-
-        if ((3 == iter % 4) && any) {
-          // Ask consumer to free any recycled state
-          any = consumer.freeRAM();
+    boolean anythingFlushed = false;
+    try {
+      DocumentsWriterPerThread flushingDWPT;
+      // Help out with flushing:
+      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+        anythingFlushed |= doFlush(flushingDWPT);
+      }
+      // If a concurrent flush is still in flight wait for it
+      while (flushControl.anyFlushing()) {
+        flushControl.waitForFlush();  
+      }
+      if (!anythingFlushed) { // apply deletes if we did not flush any document
+        synchronized (ticketQueue) {
+          ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
         }
-
-        iter++;
-      }
-
-      if (infoStream != null) {
-        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
+        applyFlushTickets();
       }
+    } finally {
+      assert flushingDeleteQueue == currentFullFlushDelQueue;
     }
+    return anythingFlushed;
   }
-
-  final WaitQueue waitQueue = new WaitQueue();
-
-  private class WaitQueue {
-    DocWriter[] waiting;
-    int nextWriteDocID;
-    int nextWriteLoc;
-    int numWaiting;
-    long waitingBytes;
-
-    public WaitQueue() {
-      waiting = new DocWriter[10];
-    }
-
-    synchronized void reset() {
-      // NOTE: nextWriteLoc doesn't need to be reset
-      assert numWaiting == 0;
-      assert waitingBytes == 0;
-      nextWriteDocID = 0;
-    }
-
-    synchronized boolean doResume() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueueResumeBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueueResumeBytes = 2*1024*1024;
-      } else {
-        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
-      }
-      return waitingBytes <= waitQueueResumeBytes;
-    }
-
-    synchronized boolean doPause() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueuePauseBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueuePauseBytes = 4*1024*1024;
-      } else {
-        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
-      }
-      return waitingBytes > waitQueuePauseBytes;
-    }
-
-    synchronized void abort() {
-      int count = 0;
-      for(int i=0;i<waiting.length;i++) {
-        final DocWriter doc = waiting[i];
-        if (doc != null) {
-          doc.abort();
-          waiting[i] = null;
-          count++;
-        }
-      }
-      waitingBytes = 0;
-      assert count == numWaiting;
-      numWaiting = 0;
+  
+  final void finishFullFlush(boolean success) {
+    assert setFlushingDeleteQueue(null);
+    if (success) {
+      // Release the flush lock
+      flushControl.finishFullFlush();
+    } else {
+      flushControl.abortFullFlushes();
     }
+  }
 
-    private void writeDocument(DocWriter doc) throws IOException {
-      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
-      boolean success = false;
-      try {
-        doc.finish();
-        nextWriteDocID++;
-        nextWriteLoc++;
-        assert nextWriteLoc <= waiting.length;
-        if (nextWriteLoc == waiting.length) {
-          nextWriteLoc = 0;
-        }
-        success = true;
-      } finally {
-        if (!success) {
-          setAborting();
-        }
-      }
+  static final class FlushTicket {
+    final FrozenBufferedDeletes frozenDeletes;
+    /* access to non-final members must be synchronized on DW#ticketQueue */
+    FlushedSegment segment;
+    boolean isSegmentFlush;
+    
+    FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
+      this.frozenDeletes = frozenDeletes;
+      this.isSegmentFlush = isSegmentFlush;
     }
-
-    synchronized public boolean add(DocWriter doc) throws IOException {
-
-      assert doc.docID >= nextWriteDocID;
-
-      if (doc.docID == nextWriteDocID) {
-        writeDocument(doc);
-        while(true) {
-          doc = waiting[nextWriteLoc];
-          if (doc != null) {
-            numWaiting--;
-            waiting[nextWriteLoc] = null;
-            waitingBytes -= doc.sizeInBytes();
-            writeDocument(doc);
-          } else {
-            break;
-          }
-        }
-      } else {
-
-        // I finished before documents that were added
-        // before me.  This can easily happen when I am a
-        // small doc and the docs before me were large, or,
-        // just due to luck in the thread scheduling.  Just
-        // add myself to the queue and when that large doc
-        // finishes, it will flush me:
-        int gap = doc.docID - nextWriteDocID;
-        if (gap >= waiting.length) {
-          // Grow queue
-          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
-          assert nextWriteLoc >= 0;
-          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
-          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
-          nextWriteLoc = 0;
-          waiting = newArray;
-          gap = doc.docID - nextWriteDocID;
-        }
-
-        int loc = nextWriteLoc + gap;
-        if (loc >= waiting.length) {
-          loc -= waiting.length;
-        }
-
-        // We should only wrap one time
-        assert loc < waiting.length;
-
-        // Nobody should be in my spot!
-        assert waiting[loc] == null;
-        waiting[loc] = doc;
-        numWaiting++;
-        waitingBytes += doc.sizeInBytes();
-      }
-      
-      return doPause();
+    
+    boolean canPublish() {
+      return (!isSegmentFlush || segment != null);  
     }
   }
 }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfo.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfo.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfo.java Sat May 14 13:51:35 2011
@@ -20,9 +20,10 @@ package org.apache.lucene.index;
 /** @lucene.experimental */
 public final class FieldInfo {
   public static final int UNASSIGNED_CODEC_ID = -1;
-  public String name;
+  public final String name;
+  public final int number;
+
   public boolean isIndexed;
-  public int number;
 
   // true if term vector for this field should be stored
   boolean storeTermVector;
@@ -56,6 +57,7 @@ public final class FieldInfo {
       this.omitNorms = false;
       this.omitTermFreqAndPositions = false;
     }
+    assert !omitTermFreqAndPositions || !storePayloads;
   }
 
   void setCodecId(int codecId) {
@@ -78,6 +80,7 @@ public final class FieldInfo {
   // should only be called by FieldInfos#addOrUpdate
   void update(boolean isIndexed, boolean storeTermVector, boolean storePositionWithTermVector, 
               boolean storeOffsetWithTermVector, boolean omitNorms, boolean storePayloads, boolean omitTermFreqAndPositions) {
+
     if (this.isIndexed != isIndexed) {
       this.isIndexed = true;                      // once indexed, always index
     }
@@ -99,7 +102,33 @@ public final class FieldInfo {
       }
       if (this.omitTermFreqAndPositions != omitTermFreqAndPositions) {
         this.omitTermFreqAndPositions = true;                // if one require omitTermFreqAndPositions at least once, it remains off for life
+        this.storePayloads = false;
       }
     }
+    assert !this.omitTermFreqAndPositions || !this.storePayloads;
+  }
+  private boolean vectorsCommitted;
+
+  /**
+   * Reverts all uncommitted changes on this {@link FieldInfo}
+   * @see #commitVectors()
+   */
+  void revertUncommitted() {
+    if (storeTermVector && !vectorsCommitted) {
+      storeOffsetWithTermVector = false;
+      storePositionWithTermVector = false;
+      storeTermVector = false;  
+    }
+  }
+
+  /**
+   * Commits term vector modifications. Changes to term-vectors must be
+   * explicitly committed once the necessary files are created. If those changes
+   * are not committed subsequent {@link #revertUncommitted()} will reset the
+   * all term-vector flags before the next document.
+   */
+  void commitVectors() {
+    assert storeTermVector;
+    vectorsCommitted = true;
   }
 }

Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfos.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfos.java?rev=1103112&r1=1103111&r2=1103112&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfos.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FieldInfos.java Sat May 14 13:51:35 2011
@@ -216,6 +216,10 @@ public final class FieldInfos implements
   static final byte OMIT_TERM_FREQ_AND_POSITIONS = 0x40;
 
   private int format;
+  private boolean hasProx; // only set if readonly
+  private boolean hasVectors; // only set if readonly
+  private long version; // internal use to track changes
+  
 
   /**
    * Creates a new {@link FieldInfos} instance with a private
@@ -263,7 +267,7 @@ public final class FieldInfos implements
    */
   public FieldInfos(Directory d, String name) throws IOException {
     this((FieldNumberBiMap)null, null); // use null here to make this FIs Read-Only
-    IndexInput input = d.openInput(name);
+    final IndexInput input = d.openInput(name);
     try {
       read(input, name);
     } finally {
@@ -299,6 +303,9 @@ public final class FieldInfos implements
   @Override
   synchronized public Object clone() {
     FieldInfos fis = new FieldInfos(globalFieldNumbers, segmentCodecsBuilder);
+    fis.format = format;
+    fis.hasProx = hasProx;
+    fis.hasVectors = hasVectors;
     for (FieldInfo fi : this) {
       FieldInfo clone = (FieldInfo) (fi).clone();
       fis.putInternal(clone);
@@ -308,6 +315,10 @@ public final class FieldInfos implements
 
   /** Returns true if any fields do not omitTermFreqAndPositions */
   public boolean hasProx() {
+    if (isReadOnly()) {
+      return hasProx;
+    }
+    // mutable FIs must check!
     for (FieldInfo fi : this) {
       if (fi.isIndexed && !fi.omitTermFreqAndPositions) {
         return true;
@@ -424,8 +435,8 @@ public final class FieldInfos implements
   }
 
   synchronized private FieldInfo addOrUpdateInternal(String name, int preferredFieldNumber, boolean isIndexed,
-      boolean storeTermVector, boolean storePositionWithTermVector, boolean storeOffsetWithTermVector,
-      boolean omitNorms, boolean storePayloads, boolean omitTermFreqAndPositions) {
+                                                     boolean storeTermVector, boolean storePositionWithTermVector, boolean storeOffsetWithTermVector,
+                                                     boolean omitNorms, boolean storePayloads, boolean omitTermFreqAndPositions) {
     if (globalFieldNumbers == null) {
       throw new IllegalStateException("FieldInfos are read-only, create a new instance with a global field map to make modifications to FieldInfos");
     }
@@ -440,6 +451,7 @@ public final class FieldInfos implements
     if (fi.isIndexed && fi.getCodecId() == FieldInfo.UNASSIGNED_CODEC_ID) {
       segmentCodecsBuilder.tryAddAndSet(fi);
     }
+    version++;
     return fi;
   }
 
@@ -510,6 +522,10 @@ public final class FieldInfos implements
   }
 
   public boolean hasVectors() {
+    if (isReadOnly()) {
+      return hasVectors;
+    }
+    // mutable FIs must check
     for (FieldInfo fi : this) {
       if (fi.storeTermVector) {
         return true;
@@ -562,11 +578,16 @@ public final class FieldInfos implements
   public final boolean isReadOnly() {
     return globalFieldNumbers == null;
   }
+  
+  synchronized final long getVersion() {
+    return version;
+  }
 
   public void write(IndexOutput output) throws IOException {
     output.writeVInt(FORMAT_CURRENT);
     output.writeVInt(size());
     for (FieldInfo fi : this) {
+      assert !fi.omitTermFreqAndPositions || !fi.storePayloads;
       byte bits = 0x0;
       if (fi.isIndexed) bits |= IS_INDEXED;
       if (fi.storeTermVector) bits |= STORE_TERMVECTOR;
@@ -607,6 +628,15 @@ public final class FieldInfos implements
       boolean omitNorms = (bits & OMIT_NORMS) != 0;
       boolean storePayloads = (bits & STORE_PAYLOADS) != 0;
       boolean omitTermFreqAndPositions = (bits & OMIT_TERM_FREQ_AND_POSITIONS) != 0;
+
+      // LUCENE-3027: past indices were able to write
+      // storePayloads=true when omitTFAP is also true,
+      // which is invalid.  We correct that, here:
+      if (omitTermFreqAndPositions) {
+        storePayloads = false;
+      }
+      hasVectors |= storeTermVector;
+      hasProx |= isIndexed && !omitTermFreqAndPositions;
       final FieldInfo addInternal = addInternal(name, fieldNumber, isIndexed, storeTermVector, storePositionsWithTermVector, storeOffsetWithTermVector, omitNorms, storePayloads, omitTermFreqAndPositions);
       addInternal.setCodecId(codecId);
     }
@@ -615,5 +645,29 @@ public final class FieldInfos implements
       throw new CorruptIndexException("did not read all bytes from file \"" + fileName + "\": read " + input.getFilePointer() + " vs size " + input.length());
     }    
   }
+  
+  /**
+   * Reverts all uncommitted changes 
+   * @see FieldInfo#revertUncommitted()
+   */
+  void revertUncommitted() {
+    for (FieldInfo fieldInfo : this) {
+      fieldInfo.revertUncommitted();
+    }
+  }
+  
+  final FieldInfos asReadOnly() {
+    if (isReadOnly()) {
+      return this;
+    }
+    final FieldInfos roFis = new FieldInfos((FieldNumberBiMap)null, null);
+    for (FieldInfo fieldInfo : this) {
+      FieldInfo clone = (FieldInfo) (fieldInfo).clone();
+      roFis.putInternal(clone);
+      roFis.hasVectors |= clone.storeTermVector;
+      roFis.hasProx |= clone.isIndexed && !clone.omitTermFreqAndPositions;
+    }
+    return roFis;
+  }
 
 }