You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2010/12/11 12:07:02 UTC

svn commit: r1044635 [1/2] - in /lucene/dev/trunk/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/ test/org/apache/lucene/index/ test/org/apache/lucene/search/ test/org/apache/lucene/store/ test/org/apache/lucene/util/

Author: mikemccand
Date: Sat Dec 11 11:07:01 2010
New Revision: 1044635

URL: http://svn.apache.org/viewvc?rev=1044635&view=rev
Log:
LUCENE-2680: buffer pending deletes by segment

Added:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java   (with props)
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java   (with props)
Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/TestDemo.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/TestSearchForDuplicates.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/search/TestTermVectors.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/store/MockIndexOutputWrapper.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/LuceneTestCase.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1044635&r1=1044634&r2=1044635&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Sat Dec 11 11:07:01 2010
@@ -17,153 +17,415 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
+import java.io.IOException;
+import java.io.PrintStream;
 import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Date;
 import java.util.Map.Entry;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+
+/** Holds a {@link SegmentDeletes} for each segment in the
+ *  index. */
 
-/** Holds buffered deletes, by docID, term or query.  We
- *  hold two instances of this class: one for the deletes
- *  prior to the last flush, the other for deletes after
- *  the last flush.  This is so if we need to abort
- *  (discard all buffered docs) we can also discard the
- *  buffered deletes yet keep the deletes done during
- *  previously flushed segments. */
 class BufferedDeletes {
-  int numTerms;
-  Map<Term,Num> terms;
-  Map<Query,Integer> queries = new HashMap<Query,Integer>();
-  List<Integer> docIDs = new ArrayList<Integer>();
-  long bytesUsed;
-  private final boolean doTermSort;
-
-  public BufferedDeletes(boolean doTermSort) {
-    this.doTermSort = doTermSort;
-    if (doTermSort) {
-      terms = new TreeMap<Term,Num>();
-    } else {
-      terms = new HashMap<Term,Num>();
-    }
-  }
 
-  // Number of documents a delete term applies to.
-  final static class Num {
-    private int num;
+  // Deletes for all flushed/merged segments:
+  private final Map<SegmentInfo,SegmentDeletes> deletesMap = new HashMap<SegmentInfo,SegmentDeletes>();
 
-    Num(int num) {
-      this.num = num;
-    }
+  // used only by assert
+  private Term lastDeleteTerm;
+  
+  private PrintStream infoStream;
+  private final AtomicLong bytesUsed = new AtomicLong();
+  private final AtomicInteger numTerms = new AtomicInteger();
+  private final int messageID;
 
-    int getNum() {
-      return num;
+  public BufferedDeletes(int messageID) {
+    this.messageID = messageID;
+  }
+
+  private synchronized void message(String message) {
+    if (infoStream != null) {
+      infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: BD " + message);
     }
+  }
+  
+  public synchronized void setInfoStream(PrintStream infoStream) {
+    this.infoStream = infoStream;
+  }
+
+  public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info) {
+    pushDeletes(newDeletes, info, false);
+  }
 
-    void setNum(int num) {
-      // Only record the new number if it's greater than the
-      // current one.  This is important because if multiple
-      // threads are replacing the same doc at nearly the
-      // same time, it's possible that one thread that got a
-      // higher docID is scheduled before the other
-      // threads.
-      if (num > this.num)
-        this.num = num;
+  // Moves all pending deletes onto the provided segment,
+  // then clears the pending deletes
+  public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info, boolean noLimit) {
+    assert newDeletes.any();
+    numTerms.addAndGet(newDeletes.numTermDeletes.get());
+
+    if (!noLimit) {
+      assert !deletesMap.containsKey(info);
+      assert info != null;
+      deletesMap.put(info, newDeletes);
+      bytesUsed.addAndGet(newDeletes.bytesUsed.get());
+    } else {
+      final SegmentDeletes deletes = getDeletes(info);
+      bytesUsed.addAndGet(-deletes.bytesUsed.get());
+      deletes.update(newDeletes, noLimit);
+      bytesUsed.addAndGet(deletes.bytesUsed.get());
+    }    
+    if (infoStream != null) {
+      message("push deletes seg=" + info + " dels=" + getDeletes(info));
     }
+    assert checkDeleteStats();    
   }
 
-  int size() {
-    // We use numTerms not terms.size() intentionally, so
-    // that deletes by the same term multiple times "count",
-    // ie if you ask to flush every 1000 deletes then even
-    // dup'd terms are counted towards that 1000
-    return numTerms + queries.size() + docIDs.size();
+  public synchronized void clear() {
+    deletesMap.clear();
+    numTerms.set(0);
+    bytesUsed.set(0);
   }
 
-  void update(BufferedDeletes in) {
-    numTerms += in.numTerms;
-    bytesUsed += in.bytesUsed;
-    terms.putAll(in.terms);
-    queries.putAll(in.queries);
-    docIDs.addAll(in.docIDs);
-    in.clear();
+  synchronized boolean any() {
+    return bytesUsed.get() != 0;
   }
-    
-  void clear() {
-    terms.clear();
-    queries.clear();
-    docIDs.clear();
-    numTerms = 0;
-    bytesUsed = 0;
+
+  public int numTerms() {
+    return numTerms.get();
+  }
+
+  public long bytesUsed() {
+    return bytesUsed.get();
   }
 
-  void addBytesUsed(long b) {
-    bytesUsed += b;
+  // IW calls this on finishing a merge.  While the merge
+  // was running, it's possible new deletes were pushed onto
+  // our last (and only our last) segment.  In this case we
+  // must carry forward those deletes onto the merged
+  // segment.
+  synchronized void commitMerge(MergePolicy.OneMerge merge) {
+    assert checkDeleteStats();
+    if (infoStream != null) {
+      message("commitMerge merge.info=" + merge.info + " merge.segments=" + merge.segments);
+    }
+    final SegmentInfo lastInfo = merge.segments.lastElement();
+    final SegmentDeletes lastDeletes = deletesMap.get(lastInfo);
+    if (lastDeletes != null) {
+      deletesMap.remove(lastInfo);
+      assert !deletesMap.containsKey(merge.info);
+      deletesMap.put(merge.info, lastDeletes);
+      // don't need to update numTerms/bytesUsed since we
+      // are just moving the deletes from one info to
+      // another
+      if (infoStream != null) {
+        message("commitMerge done: new deletions=" + lastDeletes);
+      }
+    } else if (infoStream != null) {
+      message("commitMerge done: no new deletions");
+    }
+    assert !anyDeletes(merge.segments.range(0, merge.segments.size()-1));
+    assert checkDeleteStats();
   }
 
-  boolean any() {
-    return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
+  synchronized void clear(SegmentDeletes deletes) {
+    deletes.clear();
   }
+  
+  public synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos segmentInfos, SegmentInfos applyInfos) throws IOException {
+    if (!any()) {
+      return false;
+    }
+    final long t0 = System.currentTimeMillis();
+
+    if (infoStream != null) {
+      message("applyDeletes: applyInfos=" + applyInfos + "; index=" + segmentInfos);
+    }
 
-  // Remaps all buffered deletes based on a completed
-  // merge
-  synchronized void remap(MergeDocIDRemapper mapper,
-                          SegmentInfos infos,
-                          int[][] docMaps,
-                          int[] delCounts,
-                          MergePolicy.OneMerge merge,
-                          int mergeDocCount) {
+    assert checkDeleteStats();
 
-    final Map<Term,Num> newDeleteTerms;
+    assert applyInfos.size() > 0;
+
+    boolean any = false;
+    
+    final SegmentInfo lastApplyInfo = applyInfos.lastElement();
+    final int lastIdx = segmentInfos.indexOf(lastApplyInfo);
+    
+    final SegmentInfo firstInfo = applyInfos.firstElement();
+    final int firstIdx = segmentInfos.indexOf(firstInfo);
+
+    // applyInfos must be a slice of segmentInfos
+    assert lastIdx - firstIdx + 1 == applyInfos.size();
+    
+    // iterate over all segment infos backwards
+    // coalesceing deletes along the way 
+    // when we're at or below the last of the 
+    // segments to apply to, start applying the deletes
+    // we traverse up to the first apply infos
+    SegmentDeletes coalescedDeletes = null;
+    boolean hasDeletes = false;
+    for (int segIdx=segmentInfos.size()-1; segIdx >= firstIdx; segIdx--) {
+      final SegmentInfo info = segmentInfos.info(segIdx);
+      final SegmentDeletes deletes = deletesMap.get(info);
+      assert deletes == null || deletes.any();
 
-    // Remap delete-by-term
-    if (terms.size() > 0) {
-      if (doTermSort) {
-        newDeleteTerms = new TreeMap<Term,Num>();
-      } else {
-        newDeleteTerms = new HashMap<Term,Num>();
+      if (deletes == null && coalescedDeletes == null) {
+        continue;
       }
-      for(Entry<Term,Num> entry : terms.entrySet()) {
-        Num num = entry.getValue();
-        newDeleteTerms.put(entry.getKey(),
-                           new Num(mapper.remap(num.getNum())));
+
+      if (infoStream != null) {
+        message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]");
       }
-    } else 
-      newDeleteTerms = null;
-    
 
-    // Remap delete-by-docID
-    final List<Integer> newDeleteDocIDs;
+      hasDeletes |= deletes != null;
+
+      if (segIdx <= lastIdx && hasDeletes) {
+
+        any |= applyDeletes(readerPool, info, coalescedDeletes, deletes);
+      
+        if (deletes != null) {
+          // we've applied doc ids, and they're only applied
+          // on the current segment
+          bytesUsed.addAndGet(-deletes.docIDs.size() * SegmentDeletes.BYTES_PER_DEL_DOCID);
+          deletes.clearDocIDs();
+        }
+      }
+      
+      // now coalesce at the max limit
+      if (deletes != null) {
+        if (coalescedDeletes == null) {
+          coalescedDeletes = new SegmentDeletes();
+        }
+        // TODO: we could make this single pass (coalesce as
+        // we apply the deletes
+        coalescedDeletes.update(deletes, true);
+      }
+    }
+
+    // move all deletes to segment just before our merge.
+    if (firstIdx > 0) {
+
+      SegmentDeletes mergedDeletes = null;
+      // TODO: we could also make this single pass
+      for (SegmentInfo info : applyInfos) {
+        final SegmentDeletes deletes = deletesMap.get(info);
+        if (deletes != null) {
+          assert deletes.any();
+          if (mergedDeletes == null) {
+            mergedDeletes = getDeletes(segmentInfos.info(firstIdx-1));
+            numTerms.addAndGet(-mergedDeletes.numTermDeletes.get());
+            bytesUsed.addAndGet(-mergedDeletes.bytesUsed.get());
+          }
 
-    if (docIDs.size() > 0) {
-      newDeleteDocIDs = new ArrayList<Integer>(docIDs.size());
-      for (Integer num : docIDs) {
-        newDeleteDocIDs.add(Integer.valueOf(mapper.remap(num.intValue())));
+          mergedDeletes.update(deletes, true);
+        }
       }
-    } else 
-      newDeleteDocIDs = null;
+
+      if (mergedDeletes != null) {
+        numTerms.addAndGet(mergedDeletes.numTermDeletes.get());
+        bytesUsed.addAndGet(mergedDeletes.bytesUsed.get());
+      }
+
+      if (infoStream != null) {
+        if (mergedDeletes != null) {
+          message("applyDeletes: merge all deletes into seg=" + segmentInfos.info(firstIdx-1) + ": " + mergedDeletes);
+        } else {
+          message("applyDeletes: no deletes to merge");
+        }
+      }
+    } else {
+      // We drop the deletes in this case, because we've
+      // applied them to segment infos starting w/ the first
+      // segment.  There are no prior segments so there's no
+      // reason to keep them around.  When the applyInfos ==
+      // segmentInfos this means all deletes have been
+      // removed:
+    }
+    remove(applyInfos);
+
+    assert checkDeleteStats();
+    assert applyInfos != segmentInfos || !any();
+    
+    if (infoStream != null) {
+      message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+    }
+    return any;
+  }
+  
+  private synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool,
+                                            SegmentInfo info, 
+                                            SegmentDeletes coalescedDeletes,
+                                            SegmentDeletes segmentDeletes) throws IOException {    
+    assert readerPool.infoIsLive(info);
     
+    assert coalescedDeletes == null || coalescedDeletes.docIDs.size() == 0;
+    
+    boolean any = false;
 
-    // Remap delete-by-query
-    final HashMap<Query,Integer> newDeleteQueries;
+    // Lock order: IW -> BD -> RP
+    SegmentReader reader = readerPool.get(info, false);
+    try {
+      if (coalescedDeletes != null) {
+        any |= applyDeletes(coalescedDeletes, reader);
+      }
+      if (segmentDeletes != null) {
+        any |= applyDeletes(segmentDeletes, reader);
+      }
+    } finally {
+      readerPool.release(reader);
+    }
+    return any;
+  }
+  
+  private synchronized boolean applyDeletes(SegmentDeletes deletes, SegmentReader reader) throws IOException {
+    boolean any = false;
+
+    assert checkDeleteTerm(null);
     
-    if (queries.size() > 0) {
-      newDeleteQueries = new HashMap<Query, Integer>(queries.size());
-      for(Entry<Query,Integer> entry: queries.entrySet()) {
-        Integer num = entry.getValue();
-        newDeleteQueries.put(entry.getKey(),
-                             Integer.valueOf(mapper.remap(num.intValue())));
+    if (deletes.terms.size() > 0) {
+      Fields fields = reader.fields();
+      if (fields == null) {
+        // This reader has no postings
+        return false;
+      }
+
+      TermsEnum termsEnum = null;
+        
+      String currentField = null;
+      DocsEnum docs = null;
+        
+      for (Entry<Term,Integer> entry: deletes.terms.entrySet()) {
+        Term term = entry.getKey();
+        // Since we visit terms sorted, we gain performance
+        // by re-using the same TermsEnum and seeking only
+        // forwards
+        if (term.field() != currentField) {
+          assert currentField == null || currentField.compareTo(term.field()) < 0;
+          currentField = term.field();
+          Terms terms = fields.terms(currentField);
+          if (terms != null) {
+            termsEnum = terms.iterator();
+          } else {
+            termsEnum = null;
+          }
+        }
+          
+        if (termsEnum == null) {
+          continue;
+        }
+        assert checkDeleteTerm(term);
+          
+        if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
+          DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
+            
+          if (docsEnum != null) {
+            docs = docsEnum;
+            final int limit = entry.getValue();
+            while (true) {
+              final int docID = docs.nextDoc();
+              if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
+                break;
+              }
+              reader.deleteDocument(docID);
+              any = true;
+            }
+          }
+        }
       }
-    } else
-      newDeleteQueries = null;
+    }
 
-    if (newDeleteTerms != null)
-      terms = newDeleteTerms;
-    if (newDeleteDocIDs != null)
-      docIDs = newDeleteDocIDs;
-    if (newDeleteQueries != null)
-      queries = newDeleteQueries;
+    // Delete by docID
+    for (Integer docIdInt : deletes.docIDs) {
+      int docID = docIdInt.intValue();
+      reader.deleteDocument(docID);
+      any = true;
+    }
+
+    // Delete by query
+    if (deletes.queries.size() > 0) {
+      IndexSearcher searcher = new IndexSearcher(reader);
+      try {
+        for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
+          Query query = entry.getKey();
+          int limit = entry.getValue().intValue();
+          Weight weight = query.weight(searcher);
+          Scorer scorer = weight.scorer(reader, true, false);
+          if (scorer != null) {
+            while(true)  {
+              int doc = scorer.nextDoc();
+              if (doc >= limit)
+                break;
+              reader.deleteDocument(doc);
+              any = true;
+            }
+          }
+        }
+      } finally {
+        searcher.close();
+      }
+    }
+    return any;
+  }
+  
+  public synchronized SegmentDeletes getDeletes(SegmentInfo info) {
+    SegmentDeletes deletes = deletesMap.get(info);
+    if (deletes == null) {
+      deletes = new SegmentDeletes();
+      deletesMap.put(info, deletes);
+    }
+    return deletes;
+  }
+  
+  public synchronized void remove(SegmentInfos infos) {
+    assert infos.size() > 0;
+    for (SegmentInfo info : infos) {
+      SegmentDeletes deletes = deletesMap.get(info);
+      if (deletes != null) {
+        bytesUsed.addAndGet(-deletes.bytesUsed.get());
+        assert bytesUsed.get() >= 0: "bytesUsed=" + bytesUsed;
+        numTerms.addAndGet(-deletes.numTermDeletes.get());
+        assert numTerms.get() >= 0: "numTerms=" + numTerms;
+        deletesMap.remove(info);
+      }
+    }
+  }
+
+  // used only by assert
+  private boolean anyDeletes(SegmentInfos infos) {
+    for(SegmentInfo info : infos) {
+      if (deletesMap.containsKey(info)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // used only by assert
+  private boolean checkDeleteTerm(Term term) {
+    if (term != null) {
+      assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
+    }
+    lastDeleteTerm = term;
+    return true;
+  }
+  
+  // only for assert
+  private boolean checkDeleteStats() {
+    int numTerms2 = 0;
+    long bytesUsed2 = 0;
+    for(SegmentDeletes deletes : deletesMap.values()) {
+      numTerms2 += deletes.numTermDeletes.get();
+      bytesUsed2 += deletes.bytesUsed.get();
+    }
+    assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+    assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+    return true;
   }
-}
\ No newline at end of file
+}

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java?rev=1044635&r1=1044634&r2=1044635&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Sat Dec 11 11:07:01 2010
@@ -160,7 +160,7 @@ public class CompoundFileReader extends 
         id = IndexFileNames.stripSegmentName(id);
         FileEntry entry = entries.get(id);
         if (entry == null)
-            throw new IOException("No sub-file with id " + id + " found");
+          throw new IOException("No sub-file with id " + id + " found (files: " + entries.keySet() + ")");
 
         return new CSIndexInput(stream, entry.offset, entry.length, readBufferSize);
     }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1044635&r1=1044634&r2=1044635&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Sat Dec 11 11:07:01 2010
@@ -23,24 +23,18 @@ import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Similarity;
-import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.RAMFile;
 import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.RecyclingByteBlockAllocator;
 import org.apache.lucene.util.ThreadInterruptedException;
 import org.apache.lucene.util.RamUsageEstimator;
@@ -115,7 +109,6 @@ import static org.apache.lucene.util.Byt
  */
 
 final class DocumentsWriter {
-
   final AtomicLong bytesUsed = new AtomicLong(0);
   IndexWriter writer;
   Directory directory;
@@ -133,9 +126,6 @@ final class DocumentsWriter {
   private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
   private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
 
-  private int pauseThreads;               // Non-zero when we need all threads to
-                                          // pause (eg to flush)
-  boolean flushPending;                   // True when a thread has decided to flush
   boolean bufferIsFull;                   // True when it's time to write segment
   private boolean aborting;               // True if an abort is pending
 
@@ -151,6 +141,9 @@ final class DocumentsWriter {
 
   List<String> newFiles;
 
+  // Deletes for our still-in-RAM (to be flushed next) segment
+  private SegmentDeletes pendingDeletes = new SegmentDeletes();
+  
   static class DocState {
     DocumentsWriter docWriter;
     Analyzer analyzer;
@@ -276,18 +269,6 @@ final class DocumentsWriter {
 
   final DocConsumer consumer;
 
-  // Deletes done after the last flush; these are discarded
-  // on abort
-  private BufferedDeletes deletesInRAM = new BufferedDeletes(false);
-
-  // Deletes done before the last flush; these are still
-  // kept on abort
-  private BufferedDeletes deletesFlushed = new BufferedDeletes(true);
-
-  // The max number of delete terms that can be buffered before
-  // they must be flushed to disk.
-  private int maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
-
   // How much RAM we can use before flushing.  This is 0 if
   // we are flushing by doc count instead.
   private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
@@ -302,28 +283,20 @@ final class DocumentsWriter {
   // non-zero we will flush by RAM usage instead.
   private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
 
-  private int flushedDocCount;                      // How many docs already flushed to index
-
-  synchronized void updateFlushedDocCount(int n) {
-    flushedDocCount += n;
-  }
-  synchronized int getFlushedDocCount() {
-    return flushedDocCount;
-  }
-  synchronized void setFlushedDocCount(int n) {
-    flushedDocCount = n;
-  }
-
   private boolean closed;
   private final FieldInfos fieldInfos;
 
-  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos) throws IOException {
+  private final BufferedDeletes bufferedDeletes;
+  private final IndexWriter.FlushControl flushControl;
+
+  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
     this.directory = directory;
     this.writer = writer;
     this.similarity = writer.getConfig().getSimilarity();
     this.maxThreadStates = maxThreadStates;
-    flushedDocCount = writer.maxDoc();
     this.fieldInfos = fieldInfos;
+    this.bufferedDeletes = bufferedDeletes;
+    flushControl = writer.flushControl;
 
     consumer = indexingChain.getChain(this);
     if (consumer instanceof DocFieldProcessor) {
@@ -331,6 +304,57 @@ final class DocumentsWriter {
     }
   }
 
+  // Buffer a specific docID for deletion.  Currently only
+  // used when we hit a exception when adding a document
+  synchronized void deleteDocID(int docIDUpto) {
+    pendingDeletes.addDocID(docIDUpto);
+    // NOTE: we do not trigger flush here.  This is
+    // potentially a RAM leak, if you have an app that tries
+    // to add docs but every single doc always hits a
+    // non-aborting exception.  Allowing a flush here gets
+    // very messy because we are only invoked when handling
+    // exceptions so to do this properly, while handling an
+    // exception we'd have to go off and flush new deletes
+    // which is risky (likely would hit some other
+    // confounding exception).
+  }
+  
+  boolean deleteQueries(Query... queries) {
+    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
+    synchronized(this) {
+      for (Query query : queries) {
+        pendingDeletes.addQuery(query, numDocsInRAM);
+      }
+    }
+    return doFlush;
+  }
+  
+  boolean deleteQuery(Query query) { 
+    final boolean doFlush = flushControl.waitUpdate(0, 1);
+    synchronized(this) {
+      pendingDeletes.addQuery(query, numDocsInRAM);
+    }
+    return doFlush;
+  }
+  
+  boolean deleteTerms(Term... terms) {
+    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
+    synchronized(this) {
+      for (Term term : terms) {
+        pendingDeletes.addTerm(term, numDocsInRAM);
+      }
+    }
+    return doFlush;
+  }
+
+  boolean deleteTerm(Term term, boolean skipWait) {
+    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
+    synchronized(this) {
+      pendingDeletes.addTerm(term, numDocsInRAM);
+    }
+    return doFlush;
+  }
+
   public FieldInfos getFieldInfos() {
     return fieldInfos;
   }
@@ -395,12 +419,12 @@ final class DocumentsWriter {
   }
 
   /** Get current segment name we are writing. */
-  String getSegment() {
+  synchronized String getSegment() {
     return segment;
   }
 
   /** Returns how many docs are currently buffered in RAM. */
-  int getNumDocsInRAM() {
+  synchronized int getNumDocsInRAM() {
     return numDocsInRAM;
   }
 
@@ -412,46 +436,86 @@ final class DocumentsWriter {
 
   /** Returns the doc offset into the shared doc store for
    *  the current buffered docs. */
-  int getDocStoreOffset() {
+  synchronized int getDocStoreOffset() {
     return docStoreOffset;
   }
 
-  /** Closes the current open doc stores an returns the doc
-   *  store segment name.  This returns null if there are *
-   *  no buffered documents. */
-  synchronized String closeDocStore() throws IOException {
+  /** Closes the current open doc stores an sets the
+   *  docStoreSegment and docStoreUseCFS on the provided
+   *  SegmentInfo. */
+  synchronized void closeDocStore(SegmentWriteState flushState, IndexWriter writer, IndexFileDeleter deleter, SegmentInfo newSegment, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
     
-    assert allThreadsIdle();
+    final boolean isSeparate = numDocsInRAM == 0 || !segment.equals(docStoreSegment);
 
-    if (infoStream != null)
-      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
-    
-    boolean success = false;
+    assert docStoreSegment != null;
 
-    try {
-      initFlushState(true);
-      closedFiles.clear();
+    if (infoStream != null) {
+      message("closeDocStore: files=" + openFiles + "; segment=" + docStoreSegment + "; docStoreOffset=" + docStoreOffset + "; numDocsInStore=" + numDocsInStore + "; isSeparate=" + isSeparate);
+    }
 
-      consumer.closeDocStore(flushState);
-      assert 0 == openFiles.size();
+    closedFiles.clear();
+    consumer.closeDocStore(flushState);
+    flushState.numDocsInStore = 0;
+    assert 0 == openFiles.size();
 
-      String s = docStoreSegment;
-      docStoreSegment = null;
-      docStoreOffset = 0;
-      numDocsInStore = 0;
-      success = true;
-      return s;
-    } finally {
-      if (!success) {
-        abort();
+    if (isSeparate) {
+      flushState.flushedFiles.clear();
+
+      if (mergePolicy.useCompoundDocStore(segmentInfos)) {
+
+        final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION);
+
+        if (infoStream != null) {
+          message("closeDocStore: create compound file " + compoundFileName);
+        }
+
+        boolean success = false;
+        try {
+
+          CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+          for (final String file : closedFiles) {
+            cfsWriter.addFile(file);
+          }
+      
+          // Perform the merge
+          cfsWriter.close();
+
+          success = true;
+        } finally {
+          if (!success) {
+            deleter.deleteFile(compoundFileName);
+          }
+        }
+
+        // In case the files we just merged into a CFS were
+        // not registered w/ IFD:
+        deleter.deleteNewFiles(closedFiles);
+
+        final int numSegments = segmentInfos.size();
+        for(int i=0;i<numSegments;i++) {
+          SegmentInfo si = segmentInfos.info(i);
+          if (si.getDocStoreOffset() != -1 &&
+              si.getDocStoreSegment().equals(docStoreSegment)) {
+            si.setDocStoreIsCompoundFile(true);
+          }
+        }
+
+        newSegment.setDocStoreIsCompoundFile(true);
+        if (infoStream != null) {
+          message("closeDocStore: after compound file index=" + segmentInfos);
+        }
+
+        writer.checkpoint();
       }
     }
+
+    docStoreSegment = null;
+    docStoreOffset = 0;
+    numDocsInStore = 0;
   }
 
   private Collection<String> abortedFiles;               // List of files that were written before last abort()
 
-  private SegmentWriteState flushState;
-
   Collection<String> abortedFiles() {
     return abortedFiles;
   }
@@ -471,11 +535,6 @@ final class DocumentsWriter {
     return (List<String>) ((ArrayList<String>) openFiles).clone();
   }
 
-  @SuppressWarnings("unchecked")
-  synchronized List<String> closedFiles() {
-    return (List<String>) ((ArrayList<String>) closedFiles).clone();
-  }
-
   synchronized void addOpenFile(String name) {
     assert !openFiles.contains(name);
     openFiles.add(name);
@@ -488,6 +547,9 @@ final class DocumentsWriter {
   }
 
   synchronized void setAborting() {
+    if (infoStream != null) {
+      message("setAborting");
+    }
     aborting = true;
   }
 
@@ -497,61 +559,62 @@ final class DocumentsWriter {
    *  discarding any docs added since last flush. */
   synchronized void abort() throws IOException {
 
+    if (infoStream != null) {
+      message("docWriter: abort");
+    }
+
+    boolean success = false;
+
     try {
-      if (infoStream != null) {
-        message("docWriter: now abort");
-      }
 
       // Forcefully remove waiting ThreadStates from line
       waitQueue.abort();
 
       // Wait for all other threads to finish with
       // DocumentsWriter:
-      pauseAllThreads();
-
-      try {
+      waitIdle();
 
-        assert 0 == waitQueue.numWaiting;
+      if (infoStream != null) {
+        message("docWriter: abort waitIdle done");
+      }
 
-        waitQueue.waitingBytes = 0;
+      assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
 
-        try {
-          abortedFiles = openFiles();
-        } catch (Throwable t) {
-          abortedFiles = null;
-        }
+      waitQueue.waitingBytes = 0;
 
-        deletesInRAM.clear();
-        deletesFlushed.clear();
-
-        openFiles.clear();
+      try {
+        abortedFiles = openFiles();
+      } catch (Throwable t) {
+        abortedFiles = null;
+      }
 
-        for(int i=0;i<threadStates.length;i++)
-          try {
-            threadStates[i].consumer.abort();
-          } catch (Throwable t) {
-          }
+      pendingDeletes.clear();
+        
+      openFiles.clear();
 
+      for(int i=0;i<threadStates.length;i++)
         try {
-          consumer.abort();
+          threadStates[i].consumer.abort();
         } catch (Throwable t) {
         }
 
-        docStoreSegment = null;
-        numDocsInStore = 0;
-        docStoreOffset = 0;
+      try {
+        consumer.abort();
+      } catch (Throwable t) {
+      }
 
-        // Reset all postings data
-        doAfterFlush();
+      docStoreSegment = null;
+      numDocsInStore = 0;
+      docStoreOffset = 0;
 
-      } finally {
-        resumeAllThreads();
-      }
+      // Reset all postings data
+      doAfterFlush();
+      success = true;
     } finally {
       aborting = false;
       notifyAll();
       if (infoStream != null) {
-        message("docWriter: done abort; abortedFiles=" + abortedFiles);
+        message("docWriter: done abort; abortedFiles=" + abortedFiles + " success=" + success);
       }
     }
   }
@@ -566,32 +629,10 @@ final class DocumentsWriter {
     numDocsInRAM = 0;
     nextDocID = 0;
     bufferIsFull = false;
-    flushPending = false;
     for(int i=0;i<threadStates.length;i++)
       threadStates[i].doAfterFlush();
   }
 
-  // Returns true if an abort is in progress
-  synchronized boolean pauseAllThreads() {
-    pauseThreads++;
-    while(!allThreadsIdle()) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    }
-
-    return aborting;
-  }
-
-  synchronized void resumeAllThreads() {
-    pauseThreads--;
-    assert pauseThreads >= 0;
-    if (0 == pauseThreads)
-      notifyAll();
-  }
-
   private synchronized boolean allThreadsIdle() {
     for(int i=0;i<threadStates.length;i++)
       if (!threadStates[i].isIdle)
@@ -600,126 +641,173 @@ final class DocumentsWriter {
   }
 
   synchronized boolean anyChanges() {
-    return numDocsInRAM != 0 ||
-      deletesInRAM.numTerms != 0 ||
-      deletesInRAM.docIDs.size() != 0 ||
-      deletesInRAM.queries.size() != 0;
-  }
-
-  synchronized private void initFlushState(boolean onlyDocStore) {
-    initSegmentName(onlyDocStore);
-    final SegmentCodecs info = SegmentCodecs.build(fieldInfos, writer.codecs);
-    flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
-                                       docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(), info);
-  }
-
-  /** Returns the SegmentCodecs used to flush the last segment */
-  SegmentCodecs getSegmentCodecs() {
-    return flushState.segmentCodecs;
+    return numDocsInRAM != 0 || pendingDeletes.any();
   }
-  
-  /** Flush all pending docs to a new segment */
-  synchronized int flush(boolean closeDocStore) throws IOException {
 
-    assert allThreadsIdle();
+  // for testing
+  public SegmentDeletes getPendingDeletes() {
+    return pendingDeletes;
+  }
 
-    assert numDocsInRAM > 0;
+  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
+    // Lock order: DW -> BD
+    if (pendingDeletes.any()) {
+      if (newSegment != null) {
+        if (infoStream != null) {
+          message("flush: push buffered deletes to newSegment");
+        }
+        bufferedDeletes.pushDeletes(pendingDeletes, newSegment);
+      } else if (segmentInfos.size() > 0) {
+        if (infoStream != null) {
+          message("flush: push buffered deletes to previously flushed segment " + segmentInfos.lastElement());
+        }
+        bufferedDeletes.pushDeletes(pendingDeletes, segmentInfos.lastElement(), true);
+      } else {
+        if (infoStream != null) {
+          message("flush: drop buffered deletes: no segments");
+        }
+        // We can safely discard these deletes: since
+        // there are no segments, the deletions cannot
+        // affect anything.
+      }
+      pendingDeletes = new SegmentDeletes();
+    }
+  }
 
-    assert nextDocID == numDocsInRAM;
-    assert waitQueue.numWaiting == 0;
-    assert waitQueue.waitingBytes == 0;
+  public boolean anyDeletions() {
+    return pendingDeletes.any();
+  }
 
-    initFlushState(false);
+  /** Flush all pending docs to a new segment */
+  // Lock order: IW -> DW
+  synchronized SegmentInfo flush(IndexWriter writer, boolean closeDocStore, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
 
-    docStoreOffset = numDocsInStore;
+    // We change writer's segmentInfos:
+    assert Thread.holdsLock(writer);
+
+    waitIdle();
+
+    if (numDocsInRAM == 0 && numDocsInStore == 0) {
+      // nothing to do!
+      if (infoStream != null) {
+        message("flush: no docs; skipping");
+      }
+      // Lock order: IW -> DW -> BD
+      pushDeletes(null, segmentInfos);
+      return null;
+    }
+
+    if (aborting) {
+      if (infoStream != null) {
+        message("flush: skip because aborting is set");
+      }
+      return null;
+    }
 
-    if (infoStream != null)
-      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
-    
     boolean success = false;
 
+    SegmentInfo newSegment;
+
     try {
 
+      assert waitQueue.waitingBytes == 0;
+
+      assert docStoreSegment != null || numDocsInRAM == 0: "dss=" + docStoreSegment + " numDocsInRAM=" + numDocsInRAM;
+
+      assert numDocsInStore >= numDocsInRAM: "numDocsInStore=" + numDocsInStore + " numDocsInRAM=" + numDocsInRAM;
+
+      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
+                                                                 docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
+                                                                 SegmentCodecs.build(fieldInfos, writer.codecs));
+
+      newSegment = new SegmentInfo(segment, numDocsInRAM, directory, false, -1, null, false, hasProx(), flushState.segmentCodecs);
+
+      if (!closeDocStore || docStoreOffset != 0) {
+        newSegment.setDocStoreSegment(docStoreSegment);
+        newSegment.setDocStoreOffset(docStoreOffset);
+      }
+
       if (closeDocStore) {
-        assert flushState.docStoreSegmentName != null;
-        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
-        closeDocStore();
-        flushState.numDocsInStore = 0;
+        closeDocStore(flushState, writer, deleter, newSegment, mergePolicy, segmentInfos);
       }
 
-      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
-      for(int i=0;i<threadStates.length;i++)
-        threads.add(threadStates[i].consumer);
+      if (numDocsInRAM > 0) {
 
-      final long startNumBytesUsed = bytesUsed();
-      consumer.flush(threads, flushState);
+        assert nextDocID == numDocsInRAM;
+        assert waitQueue.numWaiting == 0;
+        assert waitQueue.waitingBytes == 0;
 
-      if (infoStream != null) {
-        SegmentInfo si = new SegmentInfo(flushState.segmentName,
-            flushState.numDocs, directory, false, -1, flushState.segmentName,
-            false, hasProx(), flushState.segmentCodecs);
-        final long newSegmentSize = si.sizeInBytes();
-        String message = "  ramUsed=" + nf.format(startNumBytesUsed/1024./1024.) + " MB" +
-          " newFlushedSize=" + newSegmentSize +
-          " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
-          " new/old=" + nf.format(100.0*newSegmentSize/startNumBytesUsed) + "%";
-        message(message);
-      }
+        if (infoStream != null) {
+          message("flush postings as segment " + segment + " numDocs=" + numDocsInRAM);
+        }
+    
+        final Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
+        for(int i=0;i<threadStates.length;i++) {
+          threads.add(threadStates[i].consumer);
+        }
 
-      flushedDocCount += flushState.numDocs;
+        final long startNumBytesUsed = bytesUsed();
+        consumer.flush(threads, flushState);
 
-      doAfterFlush();
+        if (infoStream != null) {
+          message("flushedFiles=" + flushState.flushedFiles);
+          message("flushed codecs=" + newSegment.getSegmentCodecs());
+        }
 
-      success = true;
+        if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
+
+          final String cfsFileName = IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+
+          if (infoStream != null) {
+            message("flush: create compound file \"" + cfsFileName + "\"");
+          }
+
+          CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
+          for(String fileName : flushState.flushedFiles) {
+            cfsWriter.addFile(fileName);
+          }
+          cfsWriter.close();
+          deleter.deleteNewFiles(flushState.flushedFiles);
+
+          newSegment.setUseCompoundFile(true);
+        }
+
+        if (infoStream != null) {
+          message("flush: segment=" + newSegment);
+          final long newSegmentSize = newSegment.sizeInBytes();
+          String message = "  ramUsed=" + nf.format(startNumBytesUsed/1024./1024.) + " MB" +
+            " newFlushedSize=" + nf.format(newSegmentSize/1024/1024) + " MB" +
+            " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+            " new/old=" + nf.format(100.0*newSegmentSize/startNumBytesUsed) + "%";
+          message(message);
+        }
+
+      } else {
+        if (infoStream != null) {
+          message("skip flushing segment: no docs");
+        }
+        newSegment = null;
+      }
 
+      success = true;
     } finally {
+      notifyAll();
       if (!success) {
+        if (segment != null) {
+          deleter.refresh(segment);
+        }
         abort();
       }
     }
 
-    assert waitQueue.waitingBytes == 0;
+    doAfterFlush();
 
-    return flushState.numDocs;
-  }
+    // Lock order: IW -> DW -> BD
+    pushDeletes(newSegment, segmentInfos);
 
-  Collection<String> getFlushedFiles() {
-    return flushState.flushedFiles;
-  }
-
-  /** Build compound file for the segment we just flushed */
-  void createCompoundFile(String segment) throws IOException {
-    
-    CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, 
-        IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION));
-    for(String fileName : flushState.flushedFiles) {
-      cfsWriter.addFile(fileName);
-    }
-      
-    // Perform the merge
-    cfsWriter.close();
-  }
-
-  /** Set flushPending if it is not already set and returns
-   *  whether it was set. This is used by IndexWriter to
-   *  trigger a single flush even when multiple threads are
-   *  trying to do so. */
-  synchronized boolean setFlushPending() {
-    if (flushPending)
-      return false;
-    else {
-      flushPending = true;
-      return true;
-    }
-  }
-
-  synchronized void clearFlushPending() {
-    bufferIsFull = false;
-    flushPending = false;
-  }
+    docStoreOffset = numDocsInStore;
 
-  synchronized void pushDeletes() {
-    deletesFlushed.update(deletesInRAM);
+    return newSegment;
   }
 
   synchronized void close() {
@@ -746,6 +834,7 @@ final class DocumentsWriter {
   synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
 
     final Thread currentThread = Thread.currentThread();
+    assert !Thread.holdsLock(writer);
 
     // First, find a thread state.  If this thread already
     // has affinity to a specific ThreadState, use that one
@@ -776,73 +865,35 @@ final class DocumentsWriter {
     }
 
     // Next, wait until my thread state is idle (in case
-    // it's shared with other threads) and for threads to
-    // not be paused nor a flush pending:
+    // it's shared with other threads), and no flush/abort
+    // pending 
     waitReady(state);
 
     // Allocate segment name if this is the first doc since
     // last flush:
     initSegmentName(false);
 
-    state.isIdle = false;
-
-    boolean success = false;
-    try {
-      state.docState.docID = nextDocID;
-
-      assert writer.testPoint("DocumentsWriter.ThreadState.init start");
-
-      if (delTerm != null) {
-        addDeleteTerm(delTerm, state.docState.docID);
-        state.doFlushAfter = timeToFlushDeletes();
-      }
-
-      assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm");
+    state.docState.docID = nextDocID++;
 
-      nextDocID++;
-      numDocsInRAM++;
-
-      // We must at this point commit to flushing to ensure we
-      // always get N docs when we flush by doc count, even if
-      // > 1 thread is adding documents:
-      if (!flushPending &&
-          maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH
-          && numDocsInRAM >= maxBufferedDocs) {
-        flushPending = true;
-        state.doFlushAfter = true;
-      }
-
-      success = true;
-    } finally {
-      if (!success) {
-        // Forcefully idle this ThreadState:
-        state.isIdle = true;
-        notifyAll();
-        if (state.doFlushAfter) {
-          state.doFlushAfter = false;
-          flushPending = false;
-        }
-      }
+    if (delTerm != null) {
+      pendingDeletes.addTerm(delTerm, state.docState.docID);
     }
 
+    numDocsInRAM++;
+    state.isIdle = false;
     return state;
   }
-
-  /** Returns true if the caller (IndexWriter) should now
-   * flush. */
-  boolean addDocument(Document doc, Analyzer analyzer)
-    throws CorruptIndexException, IOException {
+  
+  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
     return updateDocument(doc, analyzer, null);
   }
-
-  boolean updateDocument(Term t, Document doc, Analyzer analyzer)
-    throws CorruptIndexException, IOException {
-    return updateDocument(doc, analyzer, t);
-  }
-
+  
   boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
     throws CorruptIndexException, IOException {
-    
+
+    // Possibly trigger a flush, or wait until any running flush completes:
+    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
+
     // This call is synchronized but fast
     final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
 
@@ -867,11 +918,23 @@ final class DocumentsWriter {
       success = true;
     } finally {
       if (!success) {
+
+        // If this thread state had decided to flush, we
+        // must clear it so another thread can flush
+        if (doFlush) {
+          flushControl.clearFlushPending();
+        }
+
+        if (infoStream != null) {
+          message("exception in updateDocument aborting=" + aborting);
+        }
+
         synchronized(this) {
 
+          state.isIdle = true;
+          notifyAll();
+            
           if (aborting) {
-            state.isIdle = true;
-            notifyAll();
             abort();
           } else {
             skipDocWriter.docID = docState.docID;
@@ -881,323 +944,48 @@ final class DocumentsWriter {
               success2 = true;
             } finally {
               if (!success2) {
-                state.isIdle = true;
-                notifyAll();
                 abort();
                 return false;
               }
             }
 
-            state.isIdle = true;
-            notifyAll();
-
-            // If this thread state had decided to flush, we
-            // must clear it so another thread can flush
-            if (state.doFlushAfter) {
-              state.doFlushAfter = false;
-              flushPending = false;
-              notifyAll();
-            }
-
             // Immediately mark this document as deleted
             // since likely it was partially added.  This
             // keeps indexing as "all or none" (atomic) when
             // adding a document:
-            addDeleteDocID(state.docState.docID);
+            deleteDocID(state.docState.docID);
           }
         }
       }
     }
 
-    return state.doFlushAfter || timeToFlushDeletes();
-  }
-
-  // for testing
-  synchronized int getNumBufferedDeleteTerms() {
-    return deletesInRAM.numTerms;
-  }
-
-  // for testing
-  synchronized Map<Term,BufferedDeletes.Num> getBufferedDeleteTerms() {
-    return deletesInRAM.terms;
-  }
+    doFlush |= flushControl.flushByRAMUsage("new document");
 
-  /** Called whenever a merge has completed and the merged segments had deletions */
-  synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) {
-    if (docMaps == null)
-      // The merged segments had no deletes so docIDs did not change and we have nothing to do
-      return;
-    MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount);
-    deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
-    deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
-    flushedDocCount -= mapper.docShift;
+    return doFlush;
   }
 
-  synchronized private void waitReady(DocumentsWriterThreadState state) {
-
-    while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) {
+  public synchronized void waitIdle() {
+    while (!allThreadsIdle()) {
       try {
         wait();
       } catch (InterruptedException ie) {
         throw new ThreadInterruptedException(ie);
       }
     }
-
-    if (closed)
-      throw new AlreadyClosedException("this IndexWriter is closed");
   }
 
-  boolean bufferDeleteTerms(Term[] terms) throws IOException {
-    synchronized(this) {
-      waitReady(null);
-      for (int i = 0; i < terms.length; i++)
-        addDeleteTerm(terms[i], numDocsInRAM);
-    }
-    return timeToFlushDeletes();
-  }
-
-  boolean bufferDeleteTerm(Term term) throws IOException {
-    synchronized(this) {
-      waitReady(null);
-      addDeleteTerm(term, numDocsInRAM);
-    }
-    return timeToFlushDeletes();
-  }
-
-  boolean bufferDeleteQueries(Query[] queries) throws IOException {
-    synchronized(this) {
-      waitReady(null);
-      for (int i = 0; i < queries.length; i++)
-        addDeleteQuery(queries[i], numDocsInRAM);
-    }
-    return timeToFlushDeletes();
-  }
-
-  boolean bufferDeleteQuery(Query query) throws IOException {
-    synchronized(this) {
-      waitReady(null);
-      addDeleteQuery(query, numDocsInRAM);
-    }
-    return timeToFlushDeletes();
-  }
-
-  synchronized boolean deletesFull() {
-    return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-            (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + bytesUsed()) >= ramBufferSize) ||
-      (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-       ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
-  }
-
-  synchronized boolean doApplyDeletes() {
-    // Very similar to deletesFull(), except we don't count
-    // numBytesUsed, because we are checking whether
-    // deletes (alone) are consuming too many resources now
-    // and thus should be applied.  We apply deletes if RAM
-    // usage is > 1/2 of our allowed RAM buffer, to prevent
-    // too-frequent flushing of a long tail of tiny segments
-    // when merges (which always apply deletes) are
-    // infrequent.
-    return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-            (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize/2) ||
-      (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-       ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
-  }
-
-  private boolean timeToFlushDeletes() {
-    balanceRAM();
-    synchronized(this) {
-      return (bufferIsFull || deletesFull()) && setFlushPending();
-    }
-  }
-
-  void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
-    this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
-  }
-
-  int getMaxBufferedDeleteTerms() {
-    return maxBufferedDeleteTerms;
-  }
-
-  synchronized boolean hasDeletes() {
-    return deletesFlushed.any();
-  }
-
-  synchronized boolean applyDeletes(SegmentInfos infos) throws IOException {
-
-    if (!hasDeletes())
-      return false;
-
-    final long t0 = System.currentTimeMillis();
-
-    if (infoStream != null)
-      message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " +
-              deletesFlushed.docIDs.size() + " deleted docIDs and " +
-              deletesFlushed.queries.size() + " deleted queries on " +
-              + infos.size() + " segments.");
-
-    final int infosEnd = infos.size();
-
-    int docStart = 0;
-    boolean any = false;
-    for (int i = 0; i < infosEnd; i++) {
-
-      // Make sure we never attempt to apply deletes to
-      // segment in external dir
-      assert infos.info(i).dir == directory;
-
-      SegmentReader reader = writer.readerPool.get(infos.info(i), false);
+  synchronized void waitReady(DocumentsWriterThreadState state) {
+    while (!closed && (!state.isIdle || aborting)) {
       try {
-        any |= applyDeletes(reader, docStart);
-        docStart += reader.maxDoc();
-      } finally {
-        writer.readerPool.release(reader);
-      }
-    }
-
-    deletesFlushed.clear();
-    if (infoStream != null) {
-      message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec");
-    }
-
-    return any;
-  }
-
-  // used only by assert
-  private Term lastDeleteTerm;
-
-  // used only by assert
-  private boolean checkDeleteTerm(Term term) {
-    if (term != null) {
-      assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
-    }
-    lastDeleteTerm = term;
-    return true;
-  }
-
-  // Apply buffered delete terms, queries and docIDs to the
-  // provided reader
-  private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart)
-    throws CorruptIndexException, IOException {
-
-    final int docEnd = docIDStart + reader.maxDoc();
-    boolean any = false;
-
-    assert checkDeleteTerm(null);
-
-    // Delete by term
-    if (deletesFlushed.terms.size() > 0) {
-      Fields fields = reader.fields();
-      if (fields == null) {
-        // This reader has no postings
-        return false;
-      }
-
-      TermsEnum termsEnum = null;
-        
-      String currentField = null;
-      DocsEnum docs = null;
-        
-      for (Entry<Term, BufferedDeletes.Num> entry: deletesFlushed.terms.entrySet()) {
-        Term term = entry.getKey();
-        // Since we visit terms sorted, we gain performance
-        // by re-using the same TermsEnum and seeking only
-        // forwards
-        if (term.field() != currentField) {
-          assert currentField == null || currentField.compareTo(term.field()) < 0;
-          currentField = term.field();
-          Terms terms = fields.terms(currentField);
-          if (terms != null) {
-            termsEnum = terms.iterator();
-          } else {
-            termsEnum = null;
-          }
-        }
-          
-        if (termsEnum == null) {
-          continue;
-        }
-        assert checkDeleteTerm(term);
-          
-        if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
-          DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
-            
-          if (docsEnum != null) {
-            docs = docsEnum;
-            int limit = entry.getValue().getNum();
-            while (true) {
-              final int docID = docs.nextDoc();
-              if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) {
-                break;
-              }
-              reader.deleteDocument(docID);
-              any = true;
-            }
-          }
-        }
-      }
-    }
-
-    // Delete by docID
-    for (Integer docIdInt : deletesFlushed.docIDs) {
-      int docID = docIdInt.intValue();
-      if (docID >= docIDStart && docID < docEnd) {
-        reader.deleteDocument(docID-docIDStart);
-        any = true;
+        wait();
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
       }
     }
 
-    // Delete by query
-    if (deletesFlushed.queries.size() > 0) {
-      IndexSearcher searcher = new IndexSearcher(reader);
-      try {
-        for (Entry<Query, Integer> entry : deletesFlushed.queries.entrySet()) {
-          Query query = entry.getKey();
-          int limit = entry.getValue().intValue();
-          Weight weight = query.weight(searcher);
-          Scorer scorer = weight.scorer(reader, true, false);
-          if (scorer != null) {
-            while(true)  {
-              int doc = scorer.nextDoc();
-              if (((long) docIDStart) + doc >= limit)
-                break;
-              reader.deleteDocument(doc);
-              any = true;
-            }
-          }
-        }
-      } finally {
-        searcher.close();
-      }
+    if (closed) {
+      throw new AlreadyClosedException("this IndexWriter is closed");
     }
-    return any;
-  }
-
-  // Buffer a term in bufferedDeleteTerms, which records the
-  // current number of documents buffered in ram so that the
-  // delete term will be applied to those documents as well
-  // as the disk segments.
-  synchronized private void addDeleteTerm(Term term, int docCount) {
-    BufferedDeletes.Num num = deletesInRAM.terms.get(term);
-    final int docIDUpto = flushedDocCount + docCount;
-    if (num == null)
-      deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto));
-    else
-      num.setNum(docIDUpto);
-    deletesInRAM.numTerms++;
-
-    deletesInRAM.addBytesUsed(BYTES_PER_DEL_TERM + term.bytes.length);
-  }
-
-  // Buffer a specific docID for deletion.  Currently only
-  // used when we hit a exception when adding a document
-  synchronized private void addDeleteDocID(int docID) {
-    deletesInRAM.docIDs.add(Integer.valueOf(flushedDocCount+docID));
-    deletesInRAM.addBytesUsed(BYTES_PER_DEL_DOCID);
-  }
-
-  synchronized private void addDeleteQuery(Query query, int docID) {
-    deletesInRAM.queries.put(query, Integer.valueOf(flushedDocCount + docID));
-    deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY);
   }
 
   /** Does the synchronized work to finish/flush the
@@ -1218,14 +1006,18 @@ final class DocumentsWriter {
         // waiting for me to become idle.  We just forcefully
         // idle this threadState; it will be fully reset by
         // abort()
-        if (docWriter != null)
+        if (docWriter != null) {
           try {
             docWriter.abort();
           } catch (Throwable t) {
           }
+        }
 
         perThread.isIdle = true;
+
+        // wakes up any threads waiting on the wait queue
         notifyAll();
+
         return;
       }
 
@@ -1241,12 +1033,9 @@ final class DocumentsWriter {
       if (doPause)
         waitForWaitQueue();
 
-      if (bufferIsFull && !flushPending) {
-        flushPending = true;
-        perThread.doFlushAfter = true;
-      }
-
       perThread.isIdle = true;
+
+      // wakes up any threads waiting on the wait queue
       notifyAll();
     }
   }
@@ -1275,42 +1064,8 @@ final class DocumentsWriter {
   }
   final SkipDocWriter skipDocWriter = new SkipDocWriter();
 
-  long getRAMUsed() {
-    return bytesUsed() + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
-  }
-
   NumberFormat nf = NumberFormat.getInstance();
 
-  // Coarse estimates used to measure RAM usage of buffered deletes
-  final static int OBJECT_HEADER_BYTES = 8;
-  final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
-  final static int INT_NUM_BYTE = 4;
-  final static int CHAR_NUM_BYTE = 2;
-
-  /* Rough logic: HashMap has an array[Entry] w/ varying
-     load factor (say 2 * POINTER).  Entry is object w/ Term
-     key, BufferedDeletes.Num val, int hash, Entry next
-     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
-     String field and String text (OBJ_HEADER + 2*POINTER).
-     We don't count Term's field since it's interned.
-     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
-     OBJ_HEADER + string.length*CHAR).  BufferedDeletes.num is
-     OBJ_HEADER + INT. */
- 
-  final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
-
-  /* Rough logic: del docIDs are List<Integer>.  Say list
-     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
-     + int */
-  final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
-
-  /* Rough logic: HashMap has an array[Entry] w/ varying
-     load factor (say 2 * POINTER).  Entry is object w/
-     Query key, Integer val, int hash, Entry next
-     (OBJ_HEADER + 3*POINTER + INT).  Query we often
-     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
-  final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
-
   /* Initial chunks size of the shared byte[] blocks used to
      store postings data */
   final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
@@ -1333,14 +1088,14 @@ final class DocumentsWriter {
     final int[] b;
     if (0 == size) {
       b = new int[INT_BLOCK_SIZE];
-      bytesUsed.addAndGet(INT_BLOCK_SIZE*INT_NUM_BYTE);
+      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
     } else
       b = freeIntBlocks.remove(size-1);
     return b;
   }
 
-  private long bytesUsed() {
-    return bytesUsed.get();
+  long bytesUsed() {
+    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
   }
 
   /* Return int[]s to the pool */
@@ -1376,19 +1131,20 @@ final class DocumentsWriter {
     final boolean doBalance;
     final long deletesRAMUsed;
 
+    deletesRAMUsed = bufferedDeletes.bytesUsed();
+
     synchronized(this) {
       if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
         return;
       }
     
-      deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed;
-      doBalance = bytesUsed() +deletesRAMUsed >= ramBufferSize;
+      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
     }
 
     if (doBalance) {
 
       if (infoStream != null)
-        message("  RAM: now balance allocations: usedMB=" + toMB(bytesUsed()) +
+        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
                 " vs trigger=" + toMB(ramBufferSize) +
                 " deletesMB=" + toMB(deletesRAMUsed) +
                 " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
@@ -1414,7 +1170,7 @@ final class DocumentsWriter {
             bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
             if (infoStream != null) {
               if (bytesUsed()+deletesRAMUsed > ramBufferSize)
-                message("    nothing to free; now set bufferIsFull");
+                message("    nothing to free; set bufferIsFull");
               else
                 message("    nothing to free");
             }
@@ -1426,7 +1182,7 @@ final class DocumentsWriter {
           }
           if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
             freeIntBlocks.remove(freeIntBlocks.size()-1);
-            bytesUsed.addAndGet(-INT_BLOCK_SIZE * INT_NUM_BYTE);
+            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
           }
           if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
             perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
@@ -1501,8 +1257,9 @@ final class DocumentsWriter {
           nextWriteLoc = 0;
         success = true;
       } finally {
-        if (!success)
+        if (!success) {
           setAborting();
+        }
       }
     }
 
@@ -1519,8 +1276,9 @@ final class DocumentsWriter {
             waiting[nextWriteLoc] = null;
             waitingBytes -= doc.sizeInBytes();
             writeDocument(doc);
-          } else
+          } else {
             break;
+          }
         }
       } else {
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java?rev=1044635&r1=1044634&r2=1044635&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java Sat Dec 11 11:07:01 2010
@@ -27,7 +27,6 @@ final class DocumentsWriterThreadState {
 
   boolean isIdle = true;                          // false if this is currently in use by a thread
   int numThreads = 1;                             // Number of threads that share this instance
-  boolean doFlushAfter;                           // true if we should flush after processing current doc
   final DocConsumerPerThread consumer;
   final DocumentsWriter.DocState docState;
 
@@ -45,6 +44,5 @@ final class DocumentsWriterThreadState {
 
   void doAfterFlush() {
     numThreads = 0;
-    doFlushAfter = false;
   }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java?rev=1044635&r1=1044634&r2=1044635&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Sat Dec 11 11:07:01 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.RamUsageEstimator;
 
 // TODO: break into separate freq and prox writers as
 // codecs; make separate container (tii/tis/skip/*) that can
@@ -88,7 +89,7 @@ final class FreqProxTermsWriterPerField 
     }
   }
 
-  final void writeProx(final int termID, int proxCode) {
+  void writeProx(final int termID, int proxCode) {
     final Payload payload;
     if (payloadAttribute == null) {
       payload = null;
@@ -110,7 +111,7 @@ final class FreqProxTermsWriterPerField 
   }
 
   @Override
-  final void newTerm(final int termID) {
+  void newTerm(final int termID) {
     // First time we're seeing this term since the last
     // flush
     assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
@@ -127,7 +128,7 @@ final class FreqProxTermsWriterPerField 
   }
 
   @Override
-  final void addTerm(final int termID) {
+  void addTerm(final int termID) {
 
     assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
     
@@ -205,7 +206,7 @@ final class FreqProxTermsWriterPerField 
 
     @Override
     int bytesPerPosting() {
-      return ParallelPostingsArray.BYTES_PER_POSTING + 4 * DocumentsWriter.INT_NUM_BYTE;
+      return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
     }
   }