You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2011/05/09 17:24:23 UTC

svn commit: r1101062 [6/21] - in /lucene/dev/branches/bulkpostings: ./ dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/ant/ dev-tools/idea/lucene/contrib/db/bdb-je/ dev-tools/idea/lucene/contrib/db/bdb/ dev-tools/idea/...

Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Mon May  9 15:24:04 2011
@@ -18,9 +18,17 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
 
 import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.codecs.FieldsConsumer;
+import org.apache.lucene.index.codecs.PostingsConsumer;
+import org.apache.lucene.index.codecs.TermStats;
+import org.apache.lucene.index.codecs.TermsConsumer;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.RamUsageEstimator;
 
 // TODO: break into separate freq and prox writers as
@@ -28,17 +36,17 @@ import org.apache.lucene.util.RamUsageEs
 // be configured as any number of files 1..N
 final class FreqProxTermsWriterPerField extends TermsHashConsumerPerField implements Comparable<FreqProxTermsWriterPerField> {
 
-  final FreqProxTermsWriterPerThread perThread;
+  final FreqProxTermsWriter parent;
   final TermsHashPerField termsHashPerField;
   final FieldInfo fieldInfo;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
   boolean omitTermFreqAndPositions;
   PayloadAttribute payloadAttribute;
 
-  public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriterPerThread perThread, FieldInfo fieldInfo) {
+  public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriter parent, FieldInfo fieldInfo) {
     this.termsHashPerField = termsHashPerField;
-    this.perThread = perThread;
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
     docState = termsHashPerField.docState;
     fieldState = termsHashPerField.fieldState;
@@ -78,8 +86,8 @@ final class FreqProxTermsWriterPerField 
       if (fields[i].isIndexed())
         return true;
     return false;
-  }     
-  
+  }
+
   @Override
   void start(Fieldable f) {
     if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {
@@ -96,18 +104,18 @@ final class FreqProxTermsWriterPerField 
     } else {
       payload = payloadAttribute.getPayload();
     }
-    
+
     if (payload != null && payload.length > 0) {
       termsHashPerField.writeVInt(1, (proxCode<<1)|1);
       termsHashPerField.writeVInt(1, payload.length);
       termsHashPerField.writeBytes(1, payload.data, payload.offset, payload.length);
-      hasPayloads = true;      
+      hasPayloads = true;
     } else
       termsHashPerField.writeVInt(1, proxCode<<1);
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
     postings.lastPositions[termID] = fieldState.position;
-    
+
   }
 
   @Override
@@ -115,7 +123,7 @@ final class FreqProxTermsWriterPerField 
     // First time we're seeing this term since the last
     // flush
     assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
     postings.lastDocIDs[termID] = docState.docID;
     if (omitTermFreqAndPositions) {
@@ -132,9 +140,9 @@ final class FreqProxTermsWriterPerField 
   void addTerm(final int termID) {
 
     assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
-    
+
     assert omitTermFreqAndPositions || postings.docFreqs[termID] > 0;
 
     if (omitTermFreqAndPositions) {
@@ -169,7 +177,7 @@ final class FreqProxTermsWriterPerField 
       }
     }
   }
-  
+
   @Override
   ParallelPostingsArray createPostingsArray(int size) {
     return new FreqProxPostingsArray(size);
@@ -212,7 +220,180 @@ final class FreqProxTermsWriterPerField 
       return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
     }
   }
-  
+
   public void abort() {}
+
+  BytesRef payload;
+
+  /* Walk through all unique text tokens (Posting
+   * instances) found in this field and serialize them
+   * into a single RAM segment. */
+  void flush(String fieldName, FieldsConsumer consumer,  final SegmentWriteState state)
+    throws CorruptIndexException, IOException {
+
+    final TermsConsumer termsConsumer = consumer.addField(fieldInfo);
+    final Comparator<BytesRef> termComp = termsConsumer.getComparator();
+
+    final Term protoTerm = new Term(fieldName);
+
+    final boolean currentFieldOmitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions;
+
+    final Map<Term,Integer> segDeletes;
+    if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
+      segDeletes = state.segDeletes.terms;
+    } else {
+      segDeletes = null;
+    }
+
+    final int[] termIDs = termsHashPerField.sortPostings(termComp);
+    final int numTerms = termsHashPerField.bytesHash.size();
+    final BytesRef text = new BytesRef();
+    final FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
+    final ByteSliceReader freq = new ByteSliceReader();
+    final ByteSliceReader prox = new ByteSliceReader();
+
+    long sumTotalTermFreq = 0;
+    for (int i = 0; i < numTerms; i++) {
+      final int termID = termIDs[i];
+      // Get BytesRef
+      final int textStart = postings.textStarts[termID];
+      termsHashPerField.bytePool.setBytesRef(text, textStart);
+
+      termsHashPerField.initReader(freq, termID, 0);
+      if (!fieldInfo.omitTermFreqAndPositions) {
+        termsHashPerField.initReader(prox, termID, 1);
+      }
+
+      // TODO: really TermsHashPerField should take over most
+      // of this loop, including merge sort of terms from
+      // multiple threads and interacting with the
+      // TermsConsumer, only calling out to us (passing us the
+      // DocsConsumer) to handle delivery of docs/positions
+
+      final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
+
+      final int delDocLimit;
+      if (segDeletes != null) {
+        final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
+        if (docIDUpto != null) {
+          delDocLimit = docIDUpto;
+        } else {
+          delDocLimit = 0;
+        }
+      } else {
+        delDocLimit = 0;
+      }
+
+      // Now termStates has numToMerge FieldMergeStates
+      // which all share the same term.  Now we must
+      // interleave the docID streams.
+      int numDocs = 0;
+      long totTF = 0;
+      int docID = 0;
+      int termFreq = 0;
+
+      while(true) {
+        if (freq.eof()) {
+          if (postings.lastDocCodes[termID] != -1) {
+            // Return last doc
+            docID = postings.lastDocIDs[termID];
+            if (!omitTermFreqAndPositions) {
+              termFreq = postings.docFreqs[termID];
+            }
+            postings.lastDocCodes[termID] = -1;
+          } else {
+            // EOF
+            break;
+          }
+        } else {
+          final int code = freq.readVInt();
+          if (omitTermFreqAndPositions) {
+            docID += code;
+          } else {
+            docID += code >>> 1;
+            if ((code & 1) != 0) {
+              termFreq = 1;
+            } else {
+              termFreq = freq.readVInt();
+            }
+          }
+
+          assert docID != postings.lastDocIDs[termID];
+        }
+
+        numDocs++;
+        assert docID < state.numDocs: "doc=" + docID + " maxDoc=" + state.numDocs;
+        final int termDocFreq = termFreq;
+
+        // NOTE: we could check here if the docID was
+        // deleted, and skip it.  However, this is somewhat
+        // dangerous because it can yield non-deterministic
+        // behavior since we may see the docID before we see
+        // the term that caused it to be deleted.  This
+        // would mean some (but not all) of its postings may
+        // make it into the index, which'd alter the docFreq
+        // for those terms.  We could fix this by doing two
+        // passes, ie first sweep marks all del docs, and
+        // 2nd sweep does the real flush, but I suspect
+        // that'd add too much time to flush.
+        postingsConsumer.startDoc(docID, termDocFreq);
+        if (docID < delDocLimit) {
+          // Mark it deleted.  TODO: we could also skip
+          // writing its postings; this would be
+          // deterministic (just for this Term's docs).
+          if (state.deletedDocs == null) {
+            state.deletedDocs = new BitVector(state.numDocs);
+          }
+          state.deletedDocs.set(docID);
+        }
+
+        // Carefully copy over the prox + payload info,
+        // changing the format to match Lucene's segment
+        // format.
+        if (!currentFieldOmitTermFreqAndPositions) {
+          // omitTermFreqAndPositions == false so we do write positions &
+          // payload
+          int position = 0;
+          totTF += termDocFreq;
+          for(int j=0;j<termDocFreq;j++) {
+            final int code = prox.readVInt();
+            position += code >> 1;
+
+            final int payloadLength;
+            final BytesRef thisPayload;
+
+            if ((code & 1) != 0) {
+              // This position has a payload
+              payloadLength = prox.readVInt();
+
+              if (payload == null) {
+                payload = new BytesRef();
+                payload.bytes = new byte[payloadLength];
+              } else if (payload.bytes.length < payloadLength) {
+                payload.grow(payloadLength);
+              }
+
+              prox.readBytes(payload.bytes, 0, payloadLength);
+              payload.length = payloadLength;
+              thisPayload = payload;
+
+            } else {
+              payloadLength = 0;
+              thisPayload = null;
+            }
+
+            postingsConsumer.addPosition(position, thisPayload);
+          }
+
+          postingsConsumer.finishDoc();
+        }
+      }
+      termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
+      sumTotalTermFreq += totTF;
+    }
+
+    termsConsumer.finish(sumTotalTermFreq);
+  }
+
 }
 

Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Mon May  9 15:24:04 2011
@@ -21,7 +21,13 @@ import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.lucene.index.codecs.CodecProvider;
 import org.apache.lucene.store.Directory;
@@ -49,12 +55,12 @@ import org.apache.lucene.util.Collection
  * (IndexDeletionPolicy) is consulted on creation (onInit)
  * and once per commit (onCommit), to decide when a commit
  * should be removed.
- * 
+ *
  * It is the business of the IndexDeletionPolicy to choose
  * when to delete commit points.  The actual mechanics of
  * file deletion, retrying, etc, derived from the deletion
  * of commit points is the business of the IndexFileDeleter.
- * 
+ *
  * The current default deletion policy is {@link
  * KeepOnlyLastCommitDeletionPolicy}, which removes all
  * prior commits when a new commit has completed.  This
@@ -72,7 +78,7 @@ final class IndexFileDeleter {
    * so we will retry them again later: */
   private List<String> deletable;
 
-  /* Reference count for all files in the index.  
+  /* Reference count for all files in the index.
    * Counts how many existing commits reference a file.
    **/
   private Map<String, RefCount> refCounts = new HashMap<String, RefCount>();
@@ -88,7 +94,7 @@ final class IndexFileDeleter {
    * non-commit checkpoint: */
   private List<Collection<String>> lastFiles = new ArrayList<Collection<String>>();
 
-  /* Commits that the IndexDeletionPolicy have decided to delete: */ 
+  /* Commits that the IndexDeletionPolicy have decided to delete: */
   private List<CommitPoint> commitsToDelete = new ArrayList<CommitPoint>();
 
   private PrintStream infoStream;
@@ -108,7 +114,7 @@ final class IndexFileDeleter {
       message("setInfoStream deletionPolicy=" + policy);
     }
   }
-  
+
   private void message(String message) {
     infoStream.println("IFD [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
   }
@@ -139,12 +145,12 @@ final class IndexFileDeleter {
     // counts:
     long currentGen = segmentInfos.getGeneration();
     indexFilenameFilter = new IndexFileNameFilter(codecs);
-    
+
     CommitPoint currentCommitPoint = null;
     String[] files = null;
     try {
       files = directory.listAll();
-    } catch (NoSuchDirectoryException e) {  
+    } catch (NoSuchDirectoryException e) {
       // it means the directory is empty, so ignore it.
       files = new String[0];
     }
@@ -152,7 +158,7 @@ final class IndexFileDeleter {
     for (String fileName : files) {
 
       if ((indexFilenameFilter.accept(null, fileName)) && !fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
-        
+
         // Add this file to refCounts with initial count 0:
         getRefCount(fileName);
 
@@ -233,7 +239,7 @@ final class IndexFileDeleter {
     // Now delete anything with ref count at 0.  These are
     // presumably abandoned files eg due to crash of
     // IndexWriter.
-    for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {  
+    for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
       RefCount rc = entry.getValue();
       final String fileName = entry.getKey();
       if (0 == rc.count) {
@@ -253,7 +259,7 @@ final class IndexFileDeleter {
     // Always protect the incoming segmentInfos since
     // sometime it may not be the most recent commit
     checkpoint(segmentInfos, false);
-    
+
     startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
 
     deleteCommits();
@@ -327,7 +333,7 @@ final class IndexFileDeleter {
       segmentPrefix1 = null;
       segmentPrefix2 = null;
     }
-    
+
     for(int i=0;i<files.length;i++) {
       String fileName = files[i];
       if ((segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
@@ -379,7 +385,7 @@ final class IndexFileDeleter {
       deleteCommits();
     }
   }
-  
+
   public void deletePendingFiles() throws IOException {
     if (deletable != null) {
       List<String> oldDeletable = deletable;
@@ -397,7 +403,7 @@ final class IndexFileDeleter {
   /**
    * For definition of "check point" see IndexWriter comments:
    * "Clarification: Check Points (and commits)".
-   * 
+   *
    * Writer calls this when it has made a "consistent
    * change" to the index, meaning new files are written to
    * the index and the in-memory SegmentInfos have been
@@ -417,7 +423,7 @@ final class IndexFileDeleter {
   public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
 
     if (infoStream != null) {
-      message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+      message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
     }
 
     // Try again now to delete any previously un-deletable

Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexReader.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexReader.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexReader.java Mon May  9 15:24:04 2011
@@ -919,6 +919,22 @@ public abstract class IndexReader implem
     }
   }
 
+  /**
+   * Returns <code>true</code> if an index exists at the specified directory.
+   * @param  directory the directory to check for an index
+   * @param  codecProvider provides a CodecProvider in case the index uses non-core codecs
+   * @return <code>true</code> if an index exists; <code>false</code> otherwise
+   * @throws IOException if there is a problem with accessing the index
+   */
+  public static boolean indexExists(Directory directory, CodecProvider codecProvider) throws IOException {
+    try {
+      new SegmentInfos().read(directory, codecProvider);
+      return true;
+    } catch (IOException ioe) {
+      return false;
+    }
+  }
+
   /** Returns the number of documents in this index. */
   public abstract int numDocs();
 
@@ -1436,7 +1452,7 @@ public abstract class IndexReader implem
       cfr = new CompoundFileReader(dir, filename);
 
       String [] files = cfr.listAll();
-      ArrayUtil.quickSort(files);   // sort the array of filename so that the output is more readable
+      ArrayUtil.mergeSort(files);   // sort the array of filename so that the output is more readable
 
       for (int i = 0; i < files.length; ++i) {
         long len = cfr.fileLength(files[i]);

Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1101062&r1=1101061&r2=1101062&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon May  9 15:24:04 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
@@ -46,6 +47,7 @@ import org.apache.lucene.store.BufferedI
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.BitVector;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -54,17 +56,16 @@ import org.apache.lucene.util.MapBackedS
 /**
   An <code>IndexWriter</code> creates and maintains an index.
 
-  <p>The <code>create</code> argument to the {@link
-  #IndexWriter(Directory, IndexWriterConfig) constructor} determines 
+  <p>The {@link OpenMode} option on 
+  {@link IndexWriterConfig#setOpenMode(OpenMode)} determines 
   whether a new index is created, or whether an existing index is
-  opened.  Note that you can open an index with <code>create=true</code>
-  even while readers are using the index.  The old readers will 
+  opened. Note that you can open an index with {@link OpenMode#CREATE}
+  even while readers are using the index. The old readers will 
   continue to search the "point in time" snapshot they had opened, 
-  and won't see the newly created index until they re-open.  There are
-  also {@link #IndexWriter(Directory, IndexWriterConfig) constructors}
-  with no <code>create</code> argument which will create a new index
-  if there is not already an index at the provided path and otherwise 
-  open the existing index.</p>
+  and won't see the newly created index until they re-open. If 
+  {@link OpenMode#CREATE_OR_APPEND} is used IndexWriter will create a 
+  new index if there is not already an index at the provided path
+  and otherwise open the existing index.</p>
 
   <p>In either case, documents are added with {@link #addDocument(Document)
   addDocument} and removed with {@link #deleteDocuments(Term)} or {@link
@@ -76,15 +77,19 @@ import org.apache.lucene.util.MapBackedS
   <a name="flush"></a>
   <p>These changes are buffered in memory and periodically
   flushed to the {@link Directory} (during the above method
-  calls).  A flush is triggered when there are enough
-  buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
-  or enough added documents since the last flush, whichever
-  is sooner.  For the added documents, flushing is triggered
-  either by RAM usage of the documents (see {@link
-  IndexWriterConfig#setRAMBufferSizeMB}) or the number of added documents.
-  The default is to flush when RAM usage hits 16 MB.  For
+  calls). A flush is triggered when there are enough added documents
+  since the last flush. Flushing is triggered either by RAM usage of the
+  documents (see {@link IndexWriterConfig#setRAMBufferSizeMB}) or the
+  number of added documents (see {@link IndexWriterConfig#setMaxBufferedDocs(int)}).
+  The default is to flush when RAM usage hits
+  {@link IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
   best indexing speed you should flush by RAM usage with a
-  large RAM buffer.  Note that flushing just moves the
+  large RAM buffer. Additionally, if IndexWriter reaches the configured number of
+  buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
+  the deleted terms and queries are flushed and applied to existing segments.
+  In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and 
+  {@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
+  won't trigger a segment flush. Note that flushing just moves the
   internal buffered state in IndexWriter into the index, but
   these changes are not visible to IndexReader until either
   {@link #commit()} or {@link #close} is called.  A flush may
@@ -165,21 +170,21 @@ import org.apache.lucene.util.MapBackedS
 /*
  * Clarification: Check Points (and commits)
  * IndexWriter writes new index files to the directory without writing a new segments_N
- * file which references these new files. It also means that the state of 
+ * file which references these new files. It also means that the state of
  * the in memory SegmentInfos object is different than the most recent
  * segments_N file written to the directory.
- * 
- * Each time the SegmentInfos is changed, and matches the (possibly 
- * modified) directory files, we have a new "check point". 
- * If the modified/new SegmentInfos is written to disk - as a new 
- * (generation of) segments_N file - this check point is also an 
+ *
+ * Each time the SegmentInfos is changed, and matches the (possibly
+ * modified) directory files, we have a new "check point".
+ * If the modified/new SegmentInfos is written to disk - as a new
+ * (generation of) segments_N file - this check point is also an
  * IndexCommit.
- * 
- * A new checkpoint always replaces the previous checkpoint and 
- * becomes the new "front" of the index. This allows the IndexFileDeleter 
+ *
+ * A new checkpoint always replaces the previous checkpoint and
+ * becomes the new "front" of the index. This allows the IndexFileDeleter
  * to delete files that are referenced only by stale checkpoints.
  * (files that were created since the last commit, but are no longer
- * referenced by the "front" of the index). For this, IndexFileDeleter 
+ * referenced by the "front" of the index). For this, IndexFileDeleter
  * keeps track of the last non commit checkpoint.
  */
 public class IndexWriter implements Closeable {
@@ -195,7 +200,7 @@ public class IndexWriter implements Clos
    * printed to infoStream, if set (see {@link
    * #setInfoStream}).
    */
-  public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8;
+  public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
 
   // The normal read buffer size defaults to 1024, but
   // increasing this during merging seems to yield
@@ -225,7 +230,7 @@ public class IndexWriter implements Clos
   final FieldNumberBiMap globalFieldNumberMap;
 
   private DocumentsWriter docWriter;
-  private IndexFileDeleter deleter;
+  final IndexFileDeleter deleter;
 
   private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>();           // used by optimize to note those needing optimization
   private int optimizeMaxNumSegments;
@@ -247,12 +252,12 @@ public class IndexWriter implements Clos
   private long mergeGen;
   private boolean stopMerges;
 
-  private final AtomicInteger flushCount = new AtomicInteger();
-  private final AtomicInteger flushDeletesCount = new AtomicInteger();
+  final AtomicInteger flushCount = new AtomicInteger();
+  final AtomicInteger flushDeletesCount = new AtomicInteger();
 
   final ReaderPool readerPool = new ReaderPool();
   final BufferedDeletesStream bufferedDeletesStream;
-  
+
   // This is a "write once" variable (like the organic dye
   // on a DVD-R that may or may not be heated by a laser and
   // then cooled to permanently record the event): it's
@@ -339,31 +344,58 @@ public class IndexWriter implements Clos
    */
   IndexReader getReader(boolean applyAllDeletes) throws IOException {
     ensureOpen();
-    
+
     final long tStart = System.currentTimeMillis();
 
     if (infoStream != null) {
       message("flush at getReader");
     }
-
     // Do this up front before flushing so that the readers
     // obtained during this flush are pooled, the first time
     // this method is called:
     poolReaders = true;
-
-    // Prevent segmentInfos from changing while opening the
-    // reader; in theory we could do similar retry logic,
-    // just like we do when loading segments_N
-    IndexReader r;
-    synchronized(this) {
-      flush(false, applyAllDeletes);
-      r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
-      if (infoStream != null) {
-        message("return reader version=" + r.getVersion() + " reader=" + r);
+    final IndexReader r;
+    doBeforeFlush();
+    final boolean anySegmentFlushed;
+    /*
+     * for releasing a NRT reader we must ensure that 
+     * DW doesn't add any segments or deletes until we are
+     * done with creating the NRT DirectoryReader. 
+     * We release the two stage full flush after we are done opening the
+     * directory reader!
+     */
+    synchronized (fullFlushLock) {
+      boolean success = false;
+      try {
+        anySegmentFlushed = docWriter.flushAllThreads();
+        if (!anySegmentFlushed) {
+          // prevent double increment since docWriter#doFlush increments the flushcount
+          // if we flushed anything.
+          flushCount.incrementAndGet();
+        }
+        success = true;
+        // Prevent segmentInfos from changing while opening the
+        // reader; in theory we could do similar retry logic,
+        // just like we do when loading segments_N
+        synchronized(this) {
+          maybeApplyDeletes(applyAllDeletes);
+          r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+          if (infoStream != null) {
+            message("return reader version=" + r.getVersion() + " reader=" + r);
+          }
+        }
+      } finally {
+        if (!success && infoStream != null) {
+          message("hit exception during while NRT reader");
+        }
+        // Done: finish the full flush!
+        docWriter.finishFullFlush(success);
+        doAfterFlush();
       }
     }
-    maybeMerge();
-
+    if (anySegmentFlushed) {
+      maybeMerge();
+    }
     if (infoStream != null) {
       message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
     }
@@ -400,10 +432,10 @@ public class IndexWriter implements Clos
           if (r != null) {
             r.hasChanges = false;
           }
-        }     
+        }
       }
     }
-    
+
     // used only by asserts
     public synchronized boolean infoIsLive(SegmentInfo info) {
       int idx = segmentInfos.indexOf(info);
@@ -419,7 +451,7 @@ public class IndexWriter implements Clos
       }
       return info;
     }
-    
+
     /**
      * Release the segment reader (i.e. decRef it and close if there
      * are no more references.
@@ -432,7 +464,7 @@ public class IndexWriter implements Clos
     public synchronized boolean release(SegmentReader sr) throws IOException {
       return release(sr, false);
     }
-    
+
     /**
      * Release the segment reader (i.e. decRef it and close if there
      * are no more references.
@@ -493,7 +525,7 @@ public class IndexWriter implements Clos
         sr.close();
       }
     }
-    
+
     /** Remove all our references to readers, and commits
      *  any pending changes. */
     synchronized void close() throws IOException {
@@ -503,7 +535,7 @@ public class IndexWriter implements Clos
 
       Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
       while (iter.hasNext()) {
-        
+
         Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
 
         SegmentReader sr = ent.getValue();
@@ -526,7 +558,7 @@ public class IndexWriter implements Clos
         sr.decRef();
       }
     }
-    
+
     /**
      * Commit all segment reader in the pool.
      * @throws IOException
@@ -550,7 +582,7 @@ public class IndexWriter implements Clos
         }
       }
     }
-    
+
     /**
      * Returns a ref to a clone.  NOTE: this clone is not
      * enrolled in the pool, so you should simply close()
@@ -564,7 +596,7 @@ public class IndexWriter implements Clos
         sr.decRef();
       }
     }
-   
+
     /**
      * Obtain a SegmentReader from the readerPool.  The reader
      * must be returned by calling {@link #release(SegmentReader)}
@@ -580,7 +612,7 @@ public class IndexWriter implements Clos
     /**
      * Obtain a SegmentReader from the readerPool.  The reader
      * must be returned by calling {@link #release(SegmentReader)}
-     * 
+     *
      * @see #release(SegmentReader)
      * @param info
      * @param doOpenStores
@@ -638,7 +670,7 @@ public class IndexWriter implements Clos
       return sr;
     }
   }
-  
+
   /**
    * Obtain the number of deleted docs for a pooled reader.
    * If the reader isn't being pooled, the segmentInfo's 
@@ -658,7 +690,7 @@ public class IndexWriter implements Clos
       }
     }
   }
-  
+
   /**
    * Used internally to throw an {@link
    * AlreadyClosedException} if this IndexWriter has been
@@ -721,7 +753,7 @@ public class IndexWriter implements Clos
     mergePolicy.setIndexWriter(this);
     mergeScheduler = conf.getMergeScheduler();
     codecs = conf.getCodecProvider();
-    
+
     bufferedDeletesStream = new BufferedDeletesStream(messageID);
     bufferedDeletesStream.setInfoStream(infoStream);
     poolReaders = conf.getReaderPooling();
@@ -790,8 +822,7 @@ public class IndexWriter implements Clos
 
       // start with previous field numbers, but new FieldInfos
       globalFieldNumberMap = segmentInfos.getOrLoadGlobalFieldNumberMap(directory);
-      docWriter = new DocumentsWriter(config, directory, this, conf.getIndexingChain(),
-          globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)), bufferedDeletesStream);
+      docWriter = new DocumentsWriter(config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
       docWriter.setInfoStream(infoStream);
 
       // Default deleter (for backwards compatibility) is
@@ -849,7 +880,7 @@ public class IndexWriter implements Clos
   public IndexWriterConfig getConfig() {
     return config;
   }
-  
+
   /** If non-null, this will be the default infoStream used
    * by a newly instantiated IndexWriter.
    * @see #setInfoStream
@@ -871,7 +902,7 @@ public class IndexWriter implements Clos
    * message when maxFieldLength is reached will be printed
    * to this.
    */
-  public void setInfoStream(PrintStream infoStream) {
+  public void setInfoStream(PrintStream infoStream) throws IOException {
     ensureOpen();
     this.infoStream = infoStream;
     docWriter.setInfoStream(infoStream);
@@ -881,7 +912,7 @@ public class IndexWriter implements Clos
       messageState();
   }
 
-  private void messageState() {
+  private void messageState() throws IOException {
     message("\ndir=" + directory + "\n" +
             "index=" + segString() + "\n" +
             "version=" + Constants.LUCENE_VERSION + "\n" +
@@ -901,7 +932,7 @@ public class IndexWriter implements Clos
   public boolean verbose() {
     return infoStream != null;
   }
-  
+
   /**
    * Commits all changes to an index and closes all
    * associated files.  Note that this may be a costly
@@ -916,7 +947,7 @@ public class IndexWriter implements Clos
    * even though part of it (flushing buffered documents)
    * may have succeeded, so the write lock will still be
    * held.</p>
-   * 
+   *
    * <p> If you can correct the underlying cause (eg free up
    * some disk space) then you can call close() again.
    * Failing that, if you want to force the write lock to be
@@ -1036,7 +1067,7 @@ public class IndexWriter implements Clos
 
       if (infoStream != null)
         message("now call final commit()");
-      
+
       if (!hitOOM) {
         commitInternal(null);
       }
@@ -1049,7 +1080,7 @@ public class IndexWriter implements Clos
         docWriter = null;
         deleter.close();
       }
-      
+
       if (writeLock != null) {
         writeLock.release();                          // release write lock
         writeLock = null;
@@ -1072,7 +1103,7 @@ public class IndexWriter implements Clos
   }
 
   /** Returns the Directory used by this index. */
-  public Directory getDirectory() {     
+  public Directory getDirectory() {
     // Pass false because the flush during closing calls getDirectory
     ensureOpen(false);
     return directory;
@@ -1196,22 +1227,7 @@ public class IndexWriter implements Clos
    * @throws IOException if there is a low-level IO error
    */
   public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
-    ensureOpen();
-    boolean doFlush = false;
-    boolean success = false;
-    try {
-      try {
-        doFlush = docWriter.updateDocument(doc, analyzer, null);
-        success = true;
-      } finally {
-        if (!success && infoStream != null)
-          message("hit exception adding document");
-      }
-      if (doFlush)
-        flush(true, false);
-    } catch (OutOfMemoryError oom) {
-      handleOOM(oom, "addDocument");
-    }
+    updateDocument(null, doc, analyzer);
   }
 
   /**
@@ -1228,9 +1244,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerm(term, false)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(term);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term)");
     }
@@ -1238,7 +1252,8 @@ public class IndexWriter implements Clos
 
   /**
    * Deletes the document(s) containing any of the
-   * terms. All deletes are flushed at the same time.
+   * terms. All given deletes are applied and flushed atomically
+   * at the same time.
    *
    * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
    * you should immediately close the writer.  See <a
@@ -1252,9 +1267,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerms(terms)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(terms);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term..)");
     }
@@ -1274,9 +1287,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteQuery(query)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(query);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query)");
     }
@@ -1284,7 +1295,7 @@ public class IndexWriter implements Clos
 
   /**
    * Deletes the document(s) matching any of the provided queries.
-   * All deletes are flushed at the same time.
+   * All given deletes are applied and flushed atomically at the same time.
    *
    * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
    * you should immediately close the writer.  See <a
@@ -1298,9 +1309,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteQueries(queries)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(queries);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query..)");
     }
@@ -1350,17 +1359,18 @@ public class IndexWriter implements Clos
       throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      boolean doFlush = false;
       boolean success = false;
+      boolean anySegmentFlushed = false;
       try {
-        doFlush = docWriter.updateDocument(doc, analyzer, term);
+        anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
         success = true;
       } finally {
         if (!success && infoStream != null)
           message("hit exception updating document");
       }
-      if (doFlush) {
-        flush(true, false);
+
+      if (anySegmentFlushed) {
+        maybeMerge();
       }
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "updateDocument");
@@ -1546,7 +1556,7 @@ public class IndexWriter implements Clos
       resetMergeExceptions();
       segmentsToOptimize = new HashSet<SegmentInfo>(segmentInfos);
       optimizeMaxNumSegments = maxNumSegments;
-      
+
       // Now mark all pending & running merges as optimize
       // merge:
       for(final MergePolicy.OneMerge merge  : pendingMerges) {
@@ -1612,12 +1622,12 @@ public class IndexWriter implements Clos
       if (merge.optimize)
         return true;
     }
-    
+
     for (final MergePolicy.OneMerge merge : runningMerges) {
       if (merge.optimize)
         return true;
     }
-    
+
     return false;
   }
 
@@ -1640,6 +1650,8 @@ public class IndexWriter implements Clos
     throws CorruptIndexException, IOException {
     ensureOpen();
 
+    flush(true, true);
+
     if (infoStream != null)
       message("expungeDeletes: index now " + segString());
 
@@ -1712,6 +1724,10 @@ public class IndexWriter implements Clos
    *  documents, so you must do so yourself if necessary.
    *  See also {@link #expungeDeletes(boolean)}
    *
+   *  <p><b>NOTE</b>: this method first flushes a new
+   *  segment (if there are indexed documents), and applies
+   *  all buffered deletes.
+   *
    *  <p><b>NOTE</b>: if this method hits an OutOfMemoryError
    *  you should immediately close the writer.  See <a
    *  href="#OOME">above</a> for details.</p>
@@ -1797,10 +1813,13 @@ public class IndexWriter implements Clos
     return mergingSegments;
   }
 
-  /** Expert: the {@link MergeScheduler} calls this method
-   *  to retrieve the next merge requested by the
-   *  MergePolicy */
-  synchronized MergePolicy.OneMerge getNextMerge() {
+  /**
+   * Expert: the {@link MergeScheduler} calls this method to retrieve the next
+   * merge requested by the MergePolicy
+   * 
+   * @lucene.experimental
+   */
+  public synchronized MergePolicy.OneMerge getNextMerge() {
     if (pendingMerges.size() == 0)
       return null;
     else {
@@ -1908,7 +1927,7 @@ public class IndexWriter implements Clos
   /**
    * Delete all documents in the index.
    *
-   * <p>This method will drop all buffered documents and will 
+   * <p>This method will drop all buffered documents and will
    *    remove all segments from the index. This change will not be
    *    visible until a {@link #commit()} has been called. This method
    *    can be rolled back using {@link #rollback()}.</p>
@@ -1938,7 +1957,7 @@ public class IndexWriter implements Clos
       deleter.refresh();
 
       // Don't bother saving any changes in our segmentInfos
-      readerPool.clear(null);      
+      readerPool.clear(null);
 
       // Mark that the index has changed
       ++changeCount;
@@ -1965,7 +1984,7 @@ public class IndexWriter implements Clos
         mergeFinish(merge);
       }
       pendingMerges.clear();
-      
+
       for (final MergePolicy.OneMerge merge : runningMerges) {
         if (infoStream != null)
           message("now abort running merge " + merge.segString(directory));
@@ -1992,7 +2011,7 @@ public class IndexWriter implements Clos
         message("all running merges have aborted");
 
     } else {
-      // waitForMerges() will ensure any running addIndexes finishes.  
+      // waitForMerges() will ensure any running addIndexes finishes.
       // It's fine if a new one attempts to start because from our
       // caller above the call will see that we are in the
       // process of closing, and will throw an
@@ -2004,7 +2023,7 @@ public class IndexWriter implements Clos
   /**
    * Wait for any currently outstanding merges to finish.
    *
-   * <p>It is guaranteed that any merges started prior to calling this method 
+   * <p>It is guaranteed that any merges started prior to calling this method
    *    will have completed once this method completes.</p>
    */
   public synchronized void waitForMerges() {
@@ -2034,6 +2053,125 @@ public class IndexWriter implements Clos
     deleter.checkpoint(segmentInfos, false);
   }
 
+  /**
+   * Prepares the {@link SegmentInfo} for the new flushed segment and persists
+   * the deleted documents {@link BitVector}. Use
+   * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
+   * publish the returned {@link SegmentInfo} together with its segment private
+   * delete packet.
+   * 
+   * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
+   */
+  SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+    assert flushedSegment != null;
+
+    SegmentInfo newSegment = flushedSegment.segmentInfo;
+
+    setDiagnostics(newSegment, "flush");
+
+    boolean success = false;
+    try {
+      if (useCompoundFile(newSegment)) {
+        String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+        message("creating compound file " + compoundFileName);
+        // Now build compound file
+        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+        for(String fileName : newSegment.files()) {
+          cfsWriter.addFile(fileName);
+        }
+
+        // Perform the merge
+        cfsWriter.close();
+        synchronized(this) {
+          deleter.deleteNewFiles(newSegment.files());
+        }
+
+        newSegment.setUseCompoundFile(true);
+      }
+
+      // Must write deleted docs after the CFS so we don't
+      // slurp the del file into CFS:
+      if (flushedSegment.deletedDocuments != null) {
+        final int delCount = flushedSegment.deletedDocuments.count();
+        assert delCount > 0;
+        newSegment.setDelCount(delCount);
+        newSegment.advanceDelGen();
+        final String delFileName = newSegment.getDelFileName();
+        if (infoStream != null) {
+          message("flush: write " + delCount + " deletes to " + delFileName);
+        }
+        boolean success2 = false;
+        try {
+          // TODO: in the NRT case it'd be better to hand
+          // this del vector over to the
+          // shortly-to-be-opened SegmentReader and let it
+          // carry the changes; there's no reason to use
+          // filesystem as intermediary here.
+          flushedSegment.deletedDocuments.write(directory, delFileName);
+          success2 = true;
+        } finally {
+          if (!success2) {
+            try {
+              directory.deleteFile(delFileName);
+            } catch (Throwable t) {
+              // suppress this so we keep throwing the
+              // original exception
+            }
+          }
+        }
+      }
+
+      success = true;
+    } finally {
+      if (!success) {
+        if (infoStream != null) {
+          message("hit exception " +
+              "reating compound file for newly flushed segment " + newSegment.name);
+        }
+
+        synchronized(this) {
+          deleter.refresh(newSegment.name);
+        }
+      }
+    }
+    return newSegment;
+  }
+  
+  /**
+   * Atomically adds the segment private delete packet and publishes the flushed
+   * segments SegmentInfo to the index writer. NOTE: use
+   * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
+   * {@link SegmentInfo} for the flushed segment.
+   * 
+   * @see #prepareFlushedSegment(FlushedSegment)
+   */
+  synchronized void publishFlushedSegment(SegmentInfo newSegment,
+      FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
+    // Lock order IW -> BDS
+    synchronized (bufferedDeletesStream) {
+      if (globalPacket != null && globalPacket.any()) {
+        bufferedDeletesStream.push(globalPacket);
+      } 
+      // Publishing the segment must be synched on IW -> BDS to make the sure
+      // that no merge prunes away the seg. private delete packet
+      final long nextGen;
+      if (packet != null && packet.any()) {
+        nextGen = bufferedDeletesStream.push(packet);
+      } else {
+        // Since we don't have a delete packet to apply we can get a new
+        // generation right away
+        nextGen = bufferedDeletesStream.getNextGen();
+      }
+      newSegment.setBufferedDeletesGen(nextGen);
+      segmentInfos.add(newSegment);
+      checkpoint();
+    }
+  }
+
+  synchronized boolean useCompoundFile(SegmentInfo segmentInfo) throws IOException {
+    return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+  }
+
   private synchronized void resetMergeExceptions() {
     mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
     mergeGen++;
@@ -2082,11 +2220,11 @@ public class IndexWriter implements Clos
    * <p>
    * <b>NOTE:</b> this method only copies the segments of the incoming indexes
    * and does not merge them. Therefore deleted documents are not removed and
-   * the new segments are not merged with the existing ones. Also, the segments 
-   * are copied as-is, meaning they are not converted to CFS if they aren't, 
-   * and vice-versa. If you wish to do that, you can call {@link #maybeMerge} 
+   * the new segments are not merged with the existing ones. Also, the segments
+   * are copied as-is, meaning they are not converted to CFS if they aren't,
+   * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
    * or {@link #optimize} afterwards.
-   * 
+   *
    * <p>This requires this index not be among those to be added.
    *
    * <p>
@@ -2123,7 +2261,7 @@ public class IndexWriter implements Clos
           docCount += info.docCount;
           String newSegName = newSegmentName();
           String dsName = info.getDocStoreSegment();
-          
+
           if (infoStream != null) {
             message("addIndexes: process segment origName=" + info.name + " newName=" + newSegName + " dsName=" + dsName + " info=" + info);
           }
@@ -2170,7 +2308,7 @@ public class IndexWriter implements Clos
 
           infos.add(info);
         }
-      }      
+      }
 
       synchronized (this) {
         ensureOpen();
@@ -2219,11 +2357,12 @@ public class IndexWriter implements Clos
       SegmentMerger merger = new SegmentMerger(directory, config.getTermIndexInterval(),
                                                mergedName, null, codecs, payloadProcessorProvider,
                                                globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)));
-      
+
       for (IndexReader reader : readers)      // add new indexes
         merger.add(reader);
-      
+
       int docCount = merger.merge();                // merge 'em
+
       final FieldInfos fieldInfos = merger.fieldInfos();
       SegmentInfo info = new SegmentInfo(mergedName, docCount, directory,
                                          false, fieldInfos.hasProx(), merger.getSegmentCodecs(),
@@ -2235,11 +2374,11 @@ public class IndexWriter implements Clos
       synchronized(this) { // Guard segmentInfos
         useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, info);
       }
-      
+
       // Now create the compound file if needed
       if (useCompoundFile) {
         merger.createCompoundFile(mergedName + ".cfs", info);
-        
+
         // delete new non cfs files directly: they were never
         // registered with IFD
         deleter.deleteNewFiles(info.files());
@@ -2291,7 +2430,7 @@ public class IndexWriter implements Clos
    *  #commit()} to finish the commit, or {@link
    *  #rollback()} to revert the commit and undo all changes
    *  done since the writer was opened.</p>
-   * 
+   *
    *  You can also just call {@link #commit(Map)} directly
    *  without prepareCommit first in which case that method
    *  will internally call prepareCommit.
@@ -2435,6 +2574,10 @@ public class IndexWriter implements Clos
     }
   }
 
+  // Ensures only one flush() is actually flushing segments
+  // at a time:
+  private final Object fullFlushLock = new Object();
+
   /**
    * Flush all in-memory buffered updates (adds and deletes)
    * to the Directory.
@@ -2458,116 +2601,104 @@ public class IndexWriter implements Clos
     }
   }
 
-  // TODO: this method should not have to be entirely
-  // synchronized, ie, merges should be allowed to commit
-  // even while a flush is happening
-  private synchronized boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
-
+  private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
     if (hitOOM) {
       throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
     }
 
     doBeforeFlush();
-
     assert testPoint("startDoFlush");
-
-    // We may be flushing because it was triggered by doc
-    // count, del count, ram usage (in which case flush
-    // pending is already set), or we may be flushing
-    // due to external event eg getReader or commit is
-    // called (in which case we now set it, and this will
-    // pause all threads):
-    flushControl.setFlushPendingNoWait("explicit flush");
-
     boolean success = false;
-
     try {
 
       if (infoStream != null) {
         message("  start flush: applyAllDeletes=" + applyAllDeletes);
         message("  index before flush " + segString());
       }
-    
-      final SegmentInfo newSegment = docWriter.flush(this, deleter, mergePolicy, segmentInfos);
-      if (newSegment != null) {
-        setDiagnostics(newSegment, "flush");
-        segmentInfos.add(newSegment);
-        checkpoint();
-      }
-
-      if (!applyAllDeletes) {
-        // If deletes alone are consuming > 1/2 our RAM
-        // buffer, force them all to apply now. This is to
-        // prevent too-frequent flushing of a long tail of
-        // tiny segments:
-        if (flushControl.getFlushDeletes() ||
-            (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-             bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
-          applyAllDeletes = true;
-          if (infoStream != null) {
-            message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
-          }
+      final boolean anySegmentFlushed;
+      
+      synchronized (fullFlushLock) {
+        try {
+          anySegmentFlushed = docWriter.flushAllThreads();
+          success = true;
+        } finally {
+          docWriter.finishFullFlush(success);
         }
       }
-
-      if (applyAllDeletes) {
-        if (infoStream != null) {
-          message("apply all deletes during flush");
-        }
-        flushDeletesCount.incrementAndGet();
-        final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
-        if (result.anyDeletes) {
-          checkpoint();
-        }
-        if (!keepFullyDeletedSegments && result.allDeleted != null) {
-          if (infoStream != null) {
-            message("drop 100% deleted segments: " + result.allDeleted);
-          }
-          for(SegmentInfo info : result.allDeleted) {
-            // If a merge has already registered for this
-            // segment, we leave it in the readerPool; the
-            // merge will skip merging it and will then drop
-            // it once it's done:
-            if (!mergingSegments.contains(info)) {
-              segmentInfos.remove(info);
-              if (readerPool != null) {
-                readerPool.drop(info);
-              }
-            }
-          }
-          checkpoint();
+      success = false;
+      synchronized(this) {
+        maybeApplyDeletes(applyAllDeletes);
+        doAfterFlush();
+        if (!anySegmentFlushed) {
+          // flushCount is incremented in flushAllThreads
+          flushCount.incrementAndGet();
         }
-        bufferedDeletesStream.prune(segmentInfos);
-        assert !bufferedDeletesStream.any();
-        flushControl.clearDeletes();
-      } else if (infoStream != null) {
-        message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+        success = true;
+        return anySegmentFlushed;
       }
-
-      doAfterFlush();
-      flushCount.incrementAndGet();
-
-      success = true;
-
-      return newSegment != null;
-
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "doFlush");
       // never hit
       return false;
     } finally {
-      flushControl.clearFlushPending();
       if (!success && infoStream != null)
         message("hit exception during flush");
     }
   }
+  
+  final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
+    if (applyAllDeletes) {
+      if (infoStream != null) {
+        message("apply all deletes during flush");
+      }
+      applyAllDeletes();
+    } else if (infoStream != null) {
+      message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+    }
+
+  }
+  
+  final synchronized void applyAllDeletes() throws IOException {
+    flushDeletesCount.incrementAndGet();
+    final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
+      .applyDeletes(readerPool, segmentInfos);
+    if (result.anyDeletes) {
+      checkpoint();
+    }
+    if (!keepFullyDeletedSegments && result.allDeleted != null) {
+      if (infoStream != null) {
+        message("drop 100% deleted segments: " + result.allDeleted);
+      }
+      for (SegmentInfo info : result.allDeleted) {
+        // If a merge has already registered for this
+        // segment, we leave it in the readerPool; the
+        // merge will skip merging it and will then drop
+        // it once it's done:
+        if (!mergingSegments.contains(info)) {
+          segmentInfos.remove(info);
+          if (readerPool != null) {
+            readerPool.drop(info);
+          }
+        }
+      }
+      checkpoint();
+    }
+    bufferedDeletesStream.prune(segmentInfos);
+  }
 
   /** Expert:  Return the total size of all index files currently cached in memory.
    * Useful for size management with flushRamDocs()
    */
   public final long ramSizeInBytes() {
     ensureOpen();
-    return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
+    return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+  }
+  
+  // for testing only
+  DocumentsWriter getDocsWriter() {
+    boolean test = false;
+    assert test = true;
+    return test?docWriter: null;
   }
 
   /** Expert:  Return the number of documents currently
@@ -2577,7 +2708,7 @@ public class IndexWriter implements Clos
     return docWriter.getNumDocs();
   }
 
-  private void ensureValidMerge(MergePolicy.OneMerge merge) {
+  private void ensureValidMerge(MergePolicy.OneMerge merge) throws IOException {
     for(SegmentInfo info : merge.segments) {
       if (segmentInfos.indexOf(info) == -1) {
         throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory);
@@ -2703,7 +2834,7 @@ public class IndexWriter implements Clos
     }
 
     commitMergedDeletes(merge, mergedReader);
-      
+
     // If the doc store we are using has been closed and
     // is in now compound format (but wasn't when we
     // started), then we will switch to the compound
@@ -2717,7 +2848,7 @@ public class IndexWriter implements Clos
       message("merged segment " + merge.info + " is 100% deleted" +  (keepFullyDeletedSegments ? "" : "; skipping insert"));
     }
 
-    final Set mergedAway = new HashSet<SegmentInfo>(merge.segments);
+    final Set<SegmentInfo> mergedAway = new HashSet<SegmentInfo>(merge.segments);
     int segIdx = 0;
     int newSegIdx = 0;
     boolean inserted = false;
@@ -2764,15 +2895,15 @@ public class IndexWriter implements Clos
     // them so that they don't bother writing them to
     // disk, updating SegmentInfo, etc.:
     readerPool.clear(merge.segments);
-    
+
     if (merge.optimize) {
       // cascade the optimize:
       segmentsToOptimize.add(merge.info);
     }
-    
+
     return true;
   }
-  
+
   final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
 
     if (infoStream != null) {
@@ -2808,9 +2939,10 @@ public class IndexWriter implements Clos
   /**
    * Merges the indicated segments, replacing them in the stack with a
    * single segment.
+   * 
+   * @lucene.experimental
    */
-
-  final void merge(MergePolicy.OneMerge merge)
+  public final void merge(MergePolicy.OneMerge merge)
     throws CorruptIndexException, IOException {
 
     boolean success = false;
@@ -2861,14 +2993,14 @@ public class IndexWriter implements Clos
   /** Hook that's called when the specified merge is complete. */
   void mergeSuccess(MergePolicy.OneMerge merge) {
   }
-  
+
   /** Checks whether this merge involves any segments
    *  already participating in a merge.  If not, this merge
    *  is "registered", meaning we record that its segments
    *  are now participating in a merge, and true is
    *  returned.  Else (the merge conflicts) false is
    *  returned. */
-  final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException {
+  final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException, IOException {
 
     if (merge.registerDone)
       return true;
@@ -2878,10 +3010,8 @@ public class IndexWriter implements Clos
       throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory));
     }
 
-    final int count = merge.segments.size();
     boolean isExternal = false;
-    for(int i=0;i<count;i++) {
-      final SegmentInfo info = merge.segments.info(i);
+    for(SegmentInfo info : merge.segments) {
       if (mergingSegments.contains(info)) {
         return false;
       }
@@ -2911,12 +3041,15 @@ public class IndexWriter implements Clos
     // is running (while synchronized) to avoid race
     // condition where two conflicting merges from different
     // threads, start
-    for(int i=0;i<count;i++) {
-      mergingSegments.add(merge.segments.info(i));
+    message("registerMerge merging=" + mergingSegments);
+    for(SegmentInfo info : merge.segments) {
+      message("registerMerge info=" + info);
+      mergingSegments.add(info);
     }
 
     // Merge is now registered
     merge.registerDone = true;
+
     return true;
   }
 
@@ -2991,7 +3124,6 @@ public class IndexWriter implements Clos
 
     // Lock order: IW -> BD
     bufferedDeletesStream.prune(segmentInfos);
-
     Map<String,String> details = new HashMap<String,String>();
     details.put("optimize", Boolean.toString(merge.optimize));
     details.put("mergeFactor", Integer.toString(merge.segments.size()));
@@ -3001,6 +3133,20 @@ public class IndexWriter implements Clos
       message("merge seg=" + merge.info.name);
     }
 
+    assert merge.estimatedMergeBytes == 0;
+    for(SegmentInfo info : merge.segments) {
+      if (info.docCount > 0) {
+        final int delCount = numDeletedDocs(info);
+        assert delCount <= info.docCount;
+        final double delRatio = ((double) delCount)/info.docCount;
+        merge.estimatedMergeBytes += info.sizeInBytes(true) * (1.0 - delRatio);
+      }
+    }
+
+    // TODO: I think this should no longer be needed (we
+    // now build CFS before adding segment to the infos);
+    // however, on removing it, tests fail for some reason!
+
     // Also enroll the merged segment into mergingSegments;
     // this prevents it from getting selected for a merge
     // after our merge is done but while we are building the
@@ -3008,11 +3154,11 @@ public class IndexWriter implements Clos
     mergingSegments.add(merge.info);
   }
 
-  private void setDiagnostics(SegmentInfo info, String source) {
+  static void setDiagnostics(SegmentInfo info, String source) {
     setDiagnostics(info, source, null);
   }
 
-  private void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
+  private static void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
     Map<String,String> diagnostics = new HashMap<String,String>();
     diagnostics.put("source", source);
     diagnostics.put("lucene.version", Constants.LUCENE_VERSION);
@@ -3030,7 +3176,7 @@ public class IndexWriter implements Clos
   /** Does fininishing for a merge, which is fast but holds
    *  the synchronized lock on IndexWriter instance. */
   final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
-    
+
     // Optimize, addIndexes or finishMerges may be waiting
     // on merges to finish.
     notifyAll();
@@ -3039,10 +3185,11 @@ public class IndexWriter implements Clos
     // exception inside mergeInit
     if (merge.registerDone) {
       final SegmentInfos sourceSegments = merge.segments;
-      final int end = sourceSegments.size();
-      for(int i=0;i<end;i++) {
-        mergingSegments.remove(sourceSegments.info(i));
+      for(SegmentInfo info : sourceSegments) {
+        mergingSegments.remove(info);
       }
+      // TODO: if we remove the add in _mergeInit, we should
+      // also remove this:
       mergingSegments.remove(merge.info);
       merge.registerDone = false;
     }
@@ -3101,11 +3248,11 @@ public class IndexWriter implements Clos
    *  instance */
   private int mergeMiddle(MergePolicy.OneMerge merge)
     throws CorruptIndexException, IOException {
-    
+
     merge.checkAborted(directory);
 
     final String mergedName = merge.info.name;
-    
+
     int mergedDocCount = 0;
 
     SegmentInfos sourceSegments = merge.segments;
@@ -3170,7 +3317,7 @@ public class IndexWriter implements Clos
         message("merge store matchedCount=" + merger.getMatchedSubReaderCount() + " vs " + merge.readers.size());
       }
       anyNonBulkMerges |= merger.getAnyNonBulkMerges();
-      
+
       assert mergedDocCount == totDocCount: "mergedDocCount=" + mergedDocCount + " vs " + totDocCount;
 
       // Very important to do this before opening the reader
@@ -3239,8 +3386,11 @@ public class IndexWriter implements Clos
         merge.info.setUseCompoundFile(true);
       }
 
-      final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
+      if (infoStream != null) {
+        message(String.format("merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes(true)/1024./1024., merge.estimatedMergeBytes/1024/1024.));
+      }
 
+      final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
       final int termsIndexDivisor;
       final boolean loadDocStores;
 
@@ -3301,12 +3451,12 @@ public class IndexWriter implements Clos
 
   // For test purposes.
   final int getBufferedDeleteTermsSize() {
-    return docWriter.getPendingDeletes().terms.size();
+    return docWriter.getBufferedDeleteTermsSize();
   }
 
   // For test purposes.
   final int getNumBufferedDeleteTerms() {
-    return docWriter.getPendingDeletes().numTermDeletes.get();
+    return docWriter.getNumBufferedDeleteTerms();
   }
 
   // utility routines for tests
@@ -3314,21 +3464,41 @@ public class IndexWriter implements Clos
     return segmentInfos.size() > 0 ? segmentInfos.info(segmentInfos.size()-1) : null;
   }
 
-  public synchronized String segString() {
+  /** @lucene.internal */
+  public synchronized String segString() throws IOException {
     return segString(segmentInfos);
   }
 
-  private synchronized String segString(SegmentInfos infos) {
+  /** @lucene.internal */
+  public synchronized String segString(SegmentInfos infos) throws IOException {
     StringBuilder buffer = new StringBuilder();
     final int count = infos.size();
     for(int i = 0; i < count; i++) {
       if (i > 0) {
         buffer.append(' ');
       }
-      final SegmentInfo info = infos.info(i);
-      buffer.append(info.toString(directory, 0));
-      if (info.dir != directory)
-        buffer.append("**");
+      buffer.append(segString(infos.info(i)));
+    }
+
+    return buffer.toString();
+  }
+
+  public synchronized String segString(SegmentInfo info) throws IOException {
+    StringBuilder buffer = new StringBuilder();
+    SegmentReader reader = readerPool.getIfExists(info);
+    try {
+      if (reader != null) {
+        buffer.append(reader.toString());
+      } else {
+        buffer.append(info.toString(directory, 0));
+        if (info.dir != directory) {
+          buffer.append("**");
+        }
+      }
+    } finally {
+      if (reader != null) {
+        readerPool.release(reader);
+      }
     }
     return buffer.toString();
   }
@@ -3401,17 +3571,17 @@ public class IndexWriter implements Clos
 
         assert lastCommitChangeCount <= changeCount;
         myChangeCount = changeCount;
-        
+
         if (changeCount == lastCommitChangeCount) {
           if (infoStream != null)
             message("  skip startCommit(): no changes pending");
           return;
         }
-        
+
         // First, we clone & incref the segmentInfos we intend
         // to sync, then, without locking, we sync() all files
         // referenced by toSync, in the background.
-        
+
         if (infoStream != null)
           message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
 
@@ -3419,10 +3589,10 @@ public class IndexWriter implements Clos
         toSync = (SegmentInfos) segmentInfos.clone();
 
         assert filesExist(toSync);
-        
+
         if (commitUserData != null)
           toSync.setUserData(commitUserData);
-        
+
         // This protects the segmentInfos we are now going
         // to commit.  This is important in case, eg, while
         // we are trying to sync all referenced files, a
@@ -3554,7 +3724,7 @@ public class IndexWriter implements Clos
 
   /** Expert: remove any index files that are no longer
    *  used.
-   * 
+   *
    *  <p> IndexWriter normally deletes unused files itself,
    *  during indexing.  However, on Windows, which disallows
    *  deletion of open files, if there is a reader open on
@@ -3603,7 +3773,7 @@ public class IndexWriter implements Clos
   public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
     payloadProcessorProvider = pcp;
   }
-  
+
   /**
    * Returns the {@link PayloadProcessorProvider} that is used during segment
    * merges to process payloads.
@@ -3611,124 +3781,4 @@ public class IndexWriter implements Clos
   public PayloadProcessorProvider getPayloadProcessorProvider() {
     return payloadProcessorProvider;
   }
-
-  // decides when flushes happen
-  final class FlushControl {
-
-    private boolean flushPending;
-    private boolean flushDeletes;
-    private int delCount;
-    private int docCount;
-    private boolean flushing;
-
-    private synchronized boolean setFlushPending(String reason, boolean doWait) {
-      if (flushPending || flushing) {
-        if (doWait) {
-          while(flushPending || flushing) {
-            try {
-              wait();
-            } catch (InterruptedException ie) {
-              throw new ThreadInterruptedException(ie);
-            }
-          }
-        }
-        return false;
-      } else {
-        if (infoStream != null) {
-          message("now trigger flush reason=" + reason);
-        }
-        flushPending = true;
-        return flushPending;
-      }
-    }
-
-    public synchronized void setFlushPendingNoWait(String reason) {
-      setFlushPending(reason, false);
-    }
-
-    public synchronized boolean getFlushPending() {
-      return flushPending;
-    }
-
-    public synchronized boolean getFlushDeletes() {
-      return flushDeletes;
-    }
-
-    public synchronized void clearFlushPending() {
-      if (infoStream != null) {
-        message("clearFlushPending");
-      }
-      flushPending = false;
-      flushDeletes = false;
-      docCount = 0;
-      notifyAll();
-    }
-
-    public synchronized void clearDeletes() {
-      delCount = 0;
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc) {
-      return waitUpdate(docInc, delInc, false);
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
-      while(flushPending) {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
-        }
-      }
-
-      // skipWait is only used when a thread is BOTH adding
-      // a doc and buffering a del term, and, the adding of
-      // the doc already triggered a flush
-      if (skipWait) {
-        docCount += docInc;
-        delCount += delInc;
-        return false;
-      }
-
-      final int maxBufferedDocs = config.getMaxBufferedDocs();
-      if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (docCount+docInc) >= maxBufferedDocs) {
-        return setFlushPending("maxBufferedDocs", true);
-      }
-      docCount += docInc;
-
-      final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
-      if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (delCount+delInc) >= maxBufferedDeleteTerms) {
-        flushDeletes = true;
-        return setFlushPending("maxBufferedDeleteTerms", true);
-      }
-      delCount += delInc;
-
-      return flushByRAMUsage("add delete/doc");
-    }
-
-    public synchronized boolean flushByRAMUsage(String reason) {
-      final double ramBufferSizeMB = config.getRAMBufferSizeMB();
-      if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        final long limit = (long) (ramBufferSizeMB*1024*1024);
-        long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-        if (used >= limit) {
-          
-          // DocumentsWriter may be able to free up some
-          // RAM:
-          // Lock order: FC -> DW
-          docWriter.balanceRAM();
-
-          used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-          if (used >= limit) {
-            return setFlushPending("ram full: " + reason, false);
-          }
-        }
-      }
-      return false;
-    }
-  }
-
-  final FlushControl flushControl = new FlushControl();
 }