You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/02/22 02:01:11 UTC

svn commit: r1073192 [7/32] - in /lucene/dev/branches/realtime_search: ./ dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/ant/ dev-tools/idea/lucene/contrib/demo/ dev-tools/idea/lucene/contrib/highlighter/ dev-tools/idea/lucene/c...

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Tue Feb 22 01:00:39 2011
@@ -17,440 +17,228 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import java.io.IOException;
-import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.HashMap;
-import java.util.Date;
-import java.util.Map.Entry;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.lucene.index.IndexReader.AtomicReaderContext;
-import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
 
-/** Holds a {@link SegmentDeletes} for each segment in the
- *  index. */
+/* Holds buffered deletes, by docID, term or query for a
+ * single segment. This is used to hold buffered pending
+ * deletes against the to-be-flushed segment.  Once the
+ * deletes are pushed (on flush in DocumentsWriter), these
+ * deletes are converted to a FrozenDeletes instance. */
+
+// NOTE: we are sync'd by BufferedDeletes, ie, all access to
+// instances of this class is via sync'd methods on
+// BufferedDeletes
 
 class BufferedDeletes {
 
-  // Deletes for all flushed/merged segments:
-  private final Map<SegmentInfo,SegmentDeletes> deletesMap = new HashMap<SegmentInfo,SegmentDeletes>();
-
-  // used only by assert
-  private Term lastDeleteTerm;
-
-  private PrintStream infoStream;
-  private final AtomicLong bytesUsed = new AtomicLong();
-  private final AtomicInteger numTerms = new AtomicInteger();
-  private final int messageID;
-
-  public BufferedDeletes(int messageID) {
-    this.messageID = messageID;
-  }
-
-  private synchronized void message(String message) {
-    if (infoStream != null) {
-      infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: BD " + message);
-    }
-  }
-
-  public synchronized void setInfoStream(PrintStream infoStream) {
-    this.infoStream = infoStream;
-  }
-
-  public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info) {
-    pushDeletes(newDeletes, info, false);
-  }
-
-  // Moves all pending deletes onto the provided segment,
-  // then clears the pending deletes
-  public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info, boolean noLimit) {
-    assert newDeletes.any();
-    numTerms.addAndGet(newDeletes.numTermDeletes.get());
-
-    if (!noLimit) {
-      assert !deletesMap.containsKey(info);
-      assert info != null;
-      deletesMap.put(info, newDeletes);
-      bytesUsed.addAndGet(newDeletes.bytesUsed.get());
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/ Term
+     key, Integer val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
+     String field and String text (OBJ_HEADER + 2*POINTER).
+     We don't count Term's field since it's interned.
+     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+     OBJ_HEADER + string.length*CHAR).  Integer is
+     OBJ_HEADER + INT. */
+  final static int BYTES_PER_DEL_TERM = 8*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 5*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 6*RamUsageEstimator.NUM_BYTES_INT;
+
+  /* Rough logic: del docIDs are List<Integer>.  Say list
+     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
+     + int */
+  final static int BYTES_PER_DEL_DOCID = 2*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT;
+
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/
+     Query key, Integer val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Query we often
+     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
+  final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24;
+
+  final AtomicInteger numTermDeletes = new AtomicInteger();
+  final Map<Term,Integer> terms;
+  final Map<Query,Integer> queries = new HashMap<Query,Integer>();
+  final List<Integer> docIDs = new ArrayList<Integer>();
+
+  public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
+
+  final AtomicLong bytesUsed = new AtomicLong();
+
+  private final static boolean VERBOSE_DELETES = false;
+
+  long gen;
+
+  public BufferedDeletes(boolean sortTerms) {
+    if (sortTerms) {
+      terms = new TreeMap<Term,Integer>();
     } else {
-      final SegmentDeletes deletes = getDeletes(info);
-      bytesUsed.addAndGet(-deletes.bytesUsed.get());
-      deletes.update(newDeletes, noLimit);
-      bytesUsed.addAndGet(deletes.bytesUsed.get());
+      terms = new HashMap<Term,Integer>();
     }
-    if (infoStream != null) {
-      message("push deletes seg=" + info + " dels=" + getDeletes(info));
-    }
-    assert checkDeleteStats();
-  }
-
-  public synchronized void clear() {
-    deletesMap.clear();
-    numTerms.set(0);
-    bytesUsed.set(0);
-  }
-
-  synchronized boolean any() {
-    return bytesUsed.get() != 0;
-  }
-
-  public int numTerms() {
-    return numTerms.get();
   }
 
-  public long bytesUsed() {
-    return bytesUsed.get();
-  }
-
-  // IW calls this on finishing a merge.  While the merge
-  // was running, it's possible new deletes were pushed onto
-  // our last (and only our last) segment.  In this case we
-  // must carry forward those deletes onto the merged
-  // segment.
-  synchronized void commitMerge(MergePolicy.OneMerge merge) {
-    assert checkDeleteStats();
-    if (infoStream != null) {
-      message("commitMerge merge.info=" + merge.info + " merge.segments=" + merge.segments);
-    }
-    final SegmentInfo lastInfo = merge.segments.lastElement();
-    final SegmentDeletes lastDeletes = deletesMap.get(lastInfo);
-    if (lastDeletes != null) {
-      deletesMap.remove(lastInfo);
-      assert !deletesMap.containsKey(merge.info);
-      deletesMap.put(merge.info, lastDeletes);
-      // don't need to update numTerms/bytesUsed since we
-      // are just moving the deletes from one info to
-      // another
-      if (infoStream != null) {
-        message("commitMerge done: new deletions=" + lastDeletes);
+  @Override
+  public String toString() {
+    if (VERBOSE_DELETES) {
+      return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms
+        + ", queries=" + queries + ", docIDs=" + docIDs + ", bytesUsed="
+        + bytesUsed;
+    } else {
+      String s = "gen=" + gen;
+      if (numTermDeletes.get() != 0) {
+        s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
       }
-    } else if (infoStream != null) {
-      message("commitMerge done: no new deletions");
-    }
-    assert !anyDeletes(merge.segments.range(0, merge.segments.size()-1));
-    assert checkDeleteStats();
-  }
-
-  synchronized void clear(SegmentDeletes deletes) {
-    deletes.clear();
-  }
-
-  public synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos segmentInfos, SegmentInfos applyInfos) throws IOException {
-    if (!any()) {
-      return false;
-    }
-    final long t0 = System.currentTimeMillis();
-
-    if (infoStream != null) {
-      message("applyDeletes: applyInfos=" + applyInfos + "; index=" + segmentInfos);
-    }
-
-    assert checkDeleteStats();
-
-    assert applyInfos.size() > 0;
-
-    boolean any = false;
-
-    final SegmentInfo lastApplyInfo = applyInfos.lastElement();
-    final int lastIdx = segmentInfos.indexOf(lastApplyInfo);
-
-    final SegmentInfo firstInfo = applyInfos.firstElement();
-    final int firstIdx = segmentInfos.indexOf(firstInfo);
-
-    // applyInfos must be a slice of segmentInfos
-    assert lastIdx - firstIdx + 1 == applyInfos.size();
-
-    // iterate over all segment infos backwards
-    // coalesceing deletes along the way
-    // when we're at or below the last of the
-    // segments to apply to, start applying the deletes
-    // we traverse up to the first apply infos
-    SegmentDeletes coalescedDeletes = null;
-    boolean hasDeletes = false;
-    for (int segIdx=segmentInfos.size()-1; segIdx >= firstIdx; segIdx--) {
-      final SegmentInfo info = segmentInfos.info(segIdx);
-      final SegmentDeletes deletes = deletesMap.get(info);
-      assert deletes == null || deletes.any();
-
-      if (deletes == null && coalescedDeletes == null) {
-        continue;
+      if (queries.size() != 0) {
+        s += " " + queries.size() + " deleted queries";
       }
-
-      if (infoStream != null) {
-        message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]");
+      if (docIDs.size() != 0) {
+        s += " " + docIDs.size() + " deleted docIDs";
+      }
+      if (bytesUsed.get() != 0) {
+        s += " bytesUsed=" + bytesUsed.get();
       }
 
-      hasDeletes |= deletes != null;
-
-      if (segIdx <= lastIdx && hasDeletes) {
-
-        final long delCountInc = applyDeletes(readerPool, info, coalescedDeletes, deletes);
+      return s;
+    }
+  }
 
-        if (delCountInc != 0) {
-          any = true;
-        }
-        if (infoStream != null) {
-          message("deletes touched " + delCountInc + " docIDs");
-        }
-
-        if (deletes != null) {
-          // we've applied doc ids, and they're only applied
-          // on the current segment
-          bytesUsed.addAndGet(-deletes.docIDs.size() * SegmentDeletes.BYTES_PER_DEL_DOCID);
-          deletes.clearDocIDs();
-        }
+  void update(BufferedDeletes in) {
+    numTermDeletes.addAndGet(in.numTermDeletes.get());
+    for (Map.Entry<Term,Integer> ent : in.terms.entrySet()) {
+      final Term term = ent.getKey();
+      if (!terms.containsKey(term)) {
+        // only incr bytesUsed if this term wasn't already buffered:
+        bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
       }
+      terms.put(term, MAX_INT);
+    }
 
-      // now coalesce at the max limit
-      if (deletes != null) {
-        if (coalescedDeletes == null) {
-          coalescedDeletes = new SegmentDeletes();
-        }
-        // TODO: we could make this single pass (coalesce as
-        // we apply the deletes
-        coalescedDeletes.update(deletes, true);
+    for (Map.Entry<Query,Integer> ent : in.queries.entrySet()) {
+      final Query query = ent.getKey();
+      if (!queries.containsKey(query)) {
+        // only incr bytesUsed if this query wasn't already buffered:
+        bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
       }
+      queries.put(query, MAX_INT);
     }
 
-    // move all deletes to segment just before our merge.
-    if (firstIdx > 0) {
-
-      SegmentDeletes mergedDeletes = null;
-      // TODO: we could also make this single pass
-      for (SegmentInfo info : applyInfos) {
-        final SegmentDeletes deletes = deletesMap.get(info);
-        if (deletes != null) {
-          assert deletes.any();
-          if (mergedDeletes == null) {
-            mergedDeletes = getDeletes(segmentInfos.info(firstIdx-1));
-            numTerms.addAndGet(-mergedDeletes.numTermDeletes.get());
-            assert numTerms.get() >= 0;
-            bytesUsed.addAndGet(-mergedDeletes.bytesUsed.get());
-            assert bytesUsed.get() >= 0;
-          }
-
-          mergedDeletes.update(deletes, true);
-        }
-      }
+    // docIDs never move across segments and the docIDs
+    // should already be cleared
+  }
 
-      if (mergedDeletes != null) {
-        numTerms.addAndGet(mergedDeletes.numTermDeletes.get());
-        bytesUsed.addAndGet(mergedDeletes.bytesUsed.get());
+  void update(FrozenBufferedDeletes in) {
+    numTermDeletes.addAndGet(in.numTermDeletes);
+    for(Term term : in.terms) {
+      if (!terms.containsKey(term)) {
+        // only incr bytesUsed if this term wasn't already buffered:
+        bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
       }
+      terms.put(term, MAX_INT);
+    }
 
-      if (infoStream != null) {
-        if (mergedDeletes != null) {
-          message("applyDeletes: merge all deletes into seg=" + segmentInfos.info(firstIdx-1) + ": " + mergedDeletes);
-        } else {
-          message("applyDeletes: no deletes to merge");
-        }
+    for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
+      final Query query = in.queries[queryIdx];
+      if (!queries.containsKey(query)) {
+        // only incr bytesUsed if this query wasn't already buffered:
+        bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
       }
-    } else {
-      // We drop the deletes in this case, because we've
-      // applied them to segment infos starting w/ the first
-      // segment.  There are no prior segments so there's no
-      // reason to keep them around.  When the applyInfos ==
-      // segmentInfos this means all deletes have been
-      // removed:
+      queries.put(query, MAX_INT);
     }
-    remove(applyInfos);
-
-    assert checkDeleteStats();
-    assert applyInfos != segmentInfos || !any();
+  }
 
-    if (infoStream != null) {
-      message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+  public void addQuery(Query query, int docIDUpto) {
+    Integer current = queries.put(query, docIDUpto);
+    // increment bytes used only if the query wasn't added so far.
+    if (current == null) {
+      bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
     }
-    return any;
   }
 
-  private synchronized long applyDeletes(IndexWriter.ReaderPool readerPool,
-                                            SegmentInfo info,
-                                            SegmentDeletes coalescedDeletes,
-                                            SegmentDeletes segmentDeletes) throws IOException {
-    assert readerPool.infoIsLive(info);
-
-    assert coalescedDeletes == null || coalescedDeletes.docIDs.size() == 0;
+  public void addDocID(int docID) {
+    docIDs.add(Integer.valueOf(docID));
+    bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
+  }
 
-    long delCount = 0;
+  public void addTerm(Term term, int docIDUpto) {
+    Integer current = terms.get(term);
+    if (current != null && docIDUpto < current) {
+      // Only record the new number if it's greater than the
+      // current one.  This is important because if multiple
+      // threads are replacing the same doc at nearly the
+      // same time, it's possible that one thread that got a
+      // higher docID is scheduled before the other
+      // threads.  If we blindly replace than we can
+      // incorrectly get both docs indexed.
+      return;
+    }
 
-    // Lock order: IW -> BD -> RP
-    SegmentReader reader = readerPool.get(info, false);
-    try {
-      if (coalescedDeletes != null) {
-        delCount += applyDeletes(coalescedDeletes, reader);
-      }
-      if (segmentDeletes != null) {
-        delCount += applyDeletes(segmentDeletes, reader);
-      }
-    } finally {
-      readerPool.release(reader);
+    terms.put(term, Integer.valueOf(docIDUpto));
+    numTermDeletes.incrementAndGet();
+    if (current == null) {
+      bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
     }
-    return delCount;
   }
 
-  private synchronized long applyDeletes(SegmentDeletes deletes, SegmentReader reader) throws IOException {
-
-    long delCount = 0;
-
-    assert checkDeleteTerm(null);
-
-    if (deletes.terms.size() > 0) {
-      Fields fields = reader.fields();
-      if (fields == null) {
-        // This reader has no postings
-        return 0;
+  public Iterable<Term> termsIterable() {
+    return new Iterable<Term>() {
+      // @Override -- not until Java 1.6
+      public Iterator<Term> iterator() {
+        return terms.keySet().iterator();
       }
+    };
+  }
 
-      TermsEnum termsEnum = null;
-
-      String currentField = null;
-      DocsEnum docs = null;
+  public Iterable<QueryAndLimit> queriesIterable() {
+    return new Iterable<QueryAndLimit>() {
+      
+      // @Override -- not until Java 1.6
+      public Iterator<QueryAndLimit> iterator() {
+        return new Iterator<QueryAndLimit>() {
+          private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
 
-      for (Entry<Term,Integer> entry: deletes.terms.entrySet()) {
-        Term term = entry.getKey();
-        // Since we visit terms sorted, we gain performance
-        // by re-using the same TermsEnum and seeking only
-        // forwards
-        if (term.field() != currentField) {
-          assert currentField == null || currentField.compareTo(term.field()) < 0;
-          currentField = term.field();
-          Terms terms = fields.terms(currentField);
-          if (terms != null) {
-            termsEnum = terms.iterator();
-          } else {
-            termsEnum = null;
+          // @Override -- not until Java 1.6
+          public boolean hasNext() {
+            return iter.hasNext();
           }
-        }
 
-        if (termsEnum == null) {
-          continue;
-        }
-        assert checkDeleteTerm(term);
-
-        if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
-          DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
-
-          if (docsEnum != null) {
-            docs = docsEnum;
-            final int limit = entry.getValue();
-            while (true) {
-              final int docID = docs.nextDoc();
-              if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
-                break;
-              }
-              reader.deleteDocument(docID);
-              // TODO: we could/should change
-              // reader.deleteDocument to return boolean
-              // true if it did in fact delete, because here
-              // we could be deleting an already-deleted doc
-              // which makes this an upper bound:
-              delCount++;
-            }
+          // @Override -- not until Java 1.6
+          public QueryAndLimit next() {
+            final Map.Entry<Query,Integer> ent = iter.next();
+            return new QueryAndLimit(ent.getKey(), ent.getValue());
           }
-        }
-      }
-    }
-
-    // Delete by docID
-    for (Integer docIdInt : deletes.docIDs) {
-      int docID = docIdInt.intValue();
-      reader.deleteDocument(docID);
-      delCount++;
-    }
 
-    // Delete by query
-    if (deletes.queries.size() > 0) {
-      IndexSearcher searcher = new IndexSearcher(reader);
-      assert searcher.getTopReaderContext().isAtomic;
-      final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
-      try {
-        for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
-          Query query = entry.getKey();
-          int limit = entry.getValue().intValue();
-          Weight weight = query.weight(searcher);
-          Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
-          if (scorer != null) {
-            while(true)  {
-              int doc = scorer.nextDoc();
-              if (doc >= limit)
-                break;
-
-              reader.deleteDocument(doc);
-              // TODO: we could/should change
-              // reader.deleteDocument to return boolean
-              // true if it did in fact delete, because here
-              // we could be deleting an already-deleted doc
-              // which makes this an upper bound:
-              delCount++;
-            }
+          // @Override -- not until Java 1.6
+          public void remove() {
+            throw new UnsupportedOperationException();
           }
-        }
-      } finally {
-        searcher.close();
-      }
-    }
-
-    return delCount;
-  }
-
-  public synchronized SegmentDeletes getDeletes(SegmentInfo info) {
-    SegmentDeletes deletes = deletesMap.get(info);
-    if (deletes == null) {
-      deletes = new SegmentDeletes();
-      deletesMap.put(info, deletes);
-    }
-    return deletes;
-  }
-
-  public synchronized void remove(SegmentInfos infos) {
-    assert infos.size() > 0;
-    for (SegmentInfo info : infos) {
-      SegmentDeletes deletes = deletesMap.get(info);
-      if (deletes != null) {
-        bytesUsed.addAndGet(-deletes.bytesUsed.get());
-        assert bytesUsed.get() >= 0: "bytesUsed=" + bytesUsed;
-        numTerms.addAndGet(-deletes.numTermDeletes.get());
-        assert numTerms.get() >= 0: "numTerms=" + numTerms;
-        deletesMap.remove(info);
+        };
       }
-    }
-  }
-
-  // used only by assert
-  private boolean anyDeletes(SegmentInfos infos) {
-    for(SegmentInfo info : infos) {
-      if (deletesMap.containsKey(info)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  // used only by assert
-  private boolean checkDeleteTerm(Term term) {
-    if (term != null) {
-      assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
-    }
-    lastDeleteTerm = term;
-    return true;
+    };
+  }    
+    
+  void clear() {
+    terms.clear();
+    queries.clear();
+    docIDs.clear();
+    numTermDeletes.set(0);
+    bytesUsed.set(0);
   }
-
-  // only for assert
-  private boolean checkDeleteStats() {
-    int numTerms2 = 0;
-    long bytesUsed2 = 0;
-    for(SegmentDeletes deletes : deletesMap.values()) {
-      numTerms2 += deletes.numTermDeletes.get();
-      bytesUsed2 += deletes.bytesUsed.get();
-    }
-    assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
-    assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
-    return true;
+  
+  void clearDocIDs() {
+    bytesUsed.addAndGet(-docIDs.size()*BYTES_PER_DEL_DOCID);
+    docIDs.clear();
+  }
+  
+  boolean any() {
+    return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
   }
 }

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1073192&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Tue Feb 22 01:00:39 2011
@@ -0,0 +1,465 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Comparator;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.IndexReader.AtomicReaderContext;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+
+/* Tracks the stream of {@link BuffereDeletes}.
+ * When DocumensWriter flushes, its buffered
+ * deletes are appended to this stream.  We later
+ * apply these deletes (resolve them to the actual
+ * docIDs, per segment) when a merge is started
+ * (only to the to-be-merged segments).  We
+ * also apply to all segments when NRT reader is pulled,
+ * commit/close is called, or when too many deletes are
+ * buffered and must be flushed (by RAM usage or by count).
+ *
+ * Each packet is assigned a generation, and each flushed or
+ * merged segment is also assigned a generation, so we can
+ * track which BufferedDeletes packets to apply to any given
+ * segment. */
+
+class BufferedDeletesStream {
+
+  // TODO: maybe linked list?
+  private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
+
+  // Starts at 1 so that SegmentInfos that have never had
+  // deletes applied (whose bufferedDelGen defaults to 0)
+  // will be correct:
+  private long nextGen = 1;
+
+  // used only by assert
+  private Term lastDeleteTerm;
+
+  private PrintStream infoStream;
+  private final AtomicLong bytesUsed = new AtomicLong();
+  private final AtomicInteger numTerms = new AtomicInteger();
+  private final int messageID;
+
+  public BufferedDeletesStream(int messageID) {
+    this.messageID = messageID;
+  }
+
+  private synchronized void message(String message) {
+    if (infoStream != null) {
+      infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
+    }
+  }
+
+  public synchronized void setInfoStream(PrintStream infoStream) {
+    this.infoStream = infoStream;
+  }
+
+  // Appends a new packet of buffered deletes to the stream,
+  // setting its generation:
+  public synchronized void push(FrozenBufferedDeletes packet) {
+    assert packet.any();
+    assert checkDeleteStats();
+    assert packet.gen < nextGen;
+    deletes.add(packet);
+    numTerms.addAndGet(packet.numTermDeletes);
+    bytesUsed.addAndGet(packet.bytesUsed);
+    if (infoStream != null) {
+      message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
+    }
+    assert checkDeleteStats();
+  }
+
+  public synchronized void clear() {
+    deletes.clear();
+    nextGen = 1;
+    numTerms.set(0);
+    bytesUsed.set(0);
+  }
+
+  public boolean any() {
+    return bytesUsed.get() != 0;
+  }
+
+  public int numTerms() {
+    return numTerms.get();
+  }
+
+  public long bytesUsed() {
+    return bytesUsed.get();
+  }
+
+  public static class ApplyDeletesResult {
+    // True if any actual deletes took place:
+    public final boolean anyDeletes;
+
+    // Current gen, for the merged segment:
+    public final long gen;
+
+    // If non-null, contains segments that are 100% deleted
+    public final SegmentInfos allDeleted;
+
+    ApplyDeletesResult(boolean anyDeletes, long gen, SegmentInfos allDeleted) {
+      this.anyDeletes = anyDeletes;
+      this.gen = gen;
+      this.allDeleted = allDeleted;
+    }
+  }
+
+  // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
+  private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() {
+    // @Override -- not until Java 1.6
+    public int compare(SegmentInfo si1, SegmentInfo si2) {
+      final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
+      if (cmp > 0) {
+        return 1;
+      } else if (cmp < 0) {
+        return -1;
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return sortByDelGen == other;
+    }
+  };
+
+  /** Resolves the buffered deleted Term/Query/docIDs, into
+   *  actual deleted docIDs in the deletedDocs BitVector for
+   *  each SegmentReader. */
+  public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos infos) throws IOException {
+    final long t0 = System.currentTimeMillis();
+
+    if (infos.size() == 0) {
+      return new ApplyDeletesResult(false, nextGen++, null);
+    }
+
+    assert checkDeleteStats();
+
+    if (!any()) {
+      message("applyDeletes: no deletes; skipping");
+      return new ApplyDeletesResult(false, nextGen++, null);
+    }
+
+    if (infoStream != null) {
+      message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size());
+    }
+
+    SegmentInfos infos2 = new SegmentInfos();
+    infos2.addAll(infos);
+    Collections.sort(infos2, sortByDelGen);
+
+    BufferedDeletes coalescedDeletes = null;
+    boolean anyNewDeletes = false;
+
+    int infosIDX = infos2.size()-1;
+    int delIDX = deletes.size()-1;
+
+    SegmentInfos allDeleted = null;
+
+    while (infosIDX >= 0) {
+      //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
+
+      final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
+      final SegmentInfo info = infos2.get(infosIDX);
+      final long segGen = info.getBufferedDeletesGen();
+
+      if (packet != null && segGen < packet.gen) {
+        //System.out.println("  coalesce");
+        if (coalescedDeletes == null) {
+          coalescedDeletes = new BufferedDeletes(true);
+        }
+        coalescedDeletes.update(packet);
+        delIDX--;
+      } else if (packet != null && segGen == packet.gen) {
+        //System.out.println("  eq");
+
+        // Lock order: IW -> BD -> RP
+        assert readerPool.infoIsLive(info);
+        SegmentReader reader = readerPool.get(info, false);
+        int delCount = 0;
+        final boolean segAllDeletes;
+        try {
+          if (coalescedDeletes != null) {
+            //System.out.println("    del coalesced");
+            delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+            delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
+          }
+          //System.out.println("    del exact");
+          // Don't delete by Term here; DocumentsWriter
+          // already did that on flush:
+          delCount += applyQueryDeletes(packet.queriesIterable(), reader);
+          segAllDeletes = reader.numDocs() == 0;
+        } finally {
+          readerPool.release(reader);
+        }
+        anyNewDeletes |= delCount > 0;
+
+        if (segAllDeletes) {
+          if (allDeleted == null) {
+            allDeleted = new SegmentInfos();
+          }
+          allDeleted.add(info);
+        }
+
+        if (infoStream != null) {
+          message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+        }
+
+        if (coalescedDeletes == null) {
+          coalescedDeletes = new BufferedDeletes(true);
+        }
+        coalescedDeletes.update(packet);
+        delIDX--;
+        infosIDX--;
+        info.setBufferedDeletesGen(nextGen);
+
+      } else {
+        //System.out.println("  gt");
+
+        if (coalescedDeletes != null) {
+          // Lock order: IW -> BD -> RP
+          assert readerPool.infoIsLive(info);
+          SegmentReader reader = readerPool.get(info, false);
+          int delCount = 0;
+          final boolean segAllDeletes;
+          try {
+            delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+            delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
+            segAllDeletes = reader.numDocs() == 0;
+          } finally {
+            readerPool.release(reader);
+          }
+          anyNewDeletes |= delCount > 0;
+
+          if (segAllDeletes) {
+            if (allDeleted == null) {
+              allDeleted = new SegmentInfos();
+            }
+            allDeleted.add(info);
+          }
+
+          if (infoStream != null) {
+            message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+          }
+        }
+        info.setBufferedDeletesGen(nextGen);
+
+        infosIDX--;
+      }
+    }
+
+    assert checkDeleteStats();
+    if (infoStream != null) {
+      message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+    }
+    // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
+
+    return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
+  }
+
+  public synchronized long getNextGen() {
+    return nextGen++;
+  }
+
+  // Lock order IW -> BD
+  /* Removes any BufferedDeletes that we no longer need to
+   * store because all segments in the index have had the
+   * deletes applied. */
+  public synchronized void prune(SegmentInfos segmentInfos) {
+    assert checkDeleteStats();
+    long minGen = Long.MAX_VALUE;
+    for(SegmentInfo info : segmentInfos) {
+      minGen = Math.min(info.getBufferedDeletesGen(), minGen);
+    }
+
+    if (infoStream != null) {
+      message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
+    }
+
+    final int limit = deletes.size();
+    for(int delIDX=0;delIDX<limit;delIDX++) {
+      if (deletes.get(delIDX).gen >= minGen) {
+        prune(delIDX);
+        assert checkDeleteStats();
+        return;
+      }
+    }
+
+    // All deletes pruned
+    prune(limit);
+    assert !any();
+    assert checkDeleteStats();
+  }
+
+  private synchronized void prune(int count) {
+    if (count > 0) {
+      if (infoStream != null) {
+        message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
+      }
+      for(int delIDX=0;delIDX<count;delIDX++) {
+        final FrozenBufferedDeletes packet = deletes.get(delIDX);
+        numTerms.addAndGet(-packet.numTermDeletes);
+        assert numTerms.get() >= 0;
+        bytesUsed.addAndGet(-packet.bytesUsed);
+        assert bytesUsed.get() >= 0;
+      }
+      deletes.subList(0, count).clear();
+    }
+  }
+
+  // Delete by Term
+  private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
+    long delCount = 0;
+    Fields fields = reader.fields();
+    if (fields == null) {
+      // This reader has no postings
+      return 0;
+    }
+
+    TermsEnum termsEnum = null;
+
+    String currentField = null;
+    DocsEnum docs = null;
+
+    assert checkDeleteTerm(null);
+
+    for (Term term : termsIter) {
+      // Since we visit terms sorted, we gain performance
+      // by re-using the same TermsEnum and seeking only
+      // forwards
+      if (term.field() != currentField) {
+        assert currentField == null || currentField.compareTo(term.field()) < 0;
+        currentField = term.field();
+        Terms terms = fields.terms(currentField);
+        if (terms != null) {
+          termsEnum = terms.iterator();
+        } else {
+          termsEnum = null;
+        }
+      }
+
+      if (termsEnum == null) {
+        continue;
+      }
+      assert checkDeleteTerm(term);
+
+      // System.out.println("  term=" + term);
+
+      if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
+        DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
+
+        if (docsEnum != null) {
+          while (true) {
+            final int docID = docsEnum.nextDoc();
+            if (docID == DocsEnum.NO_MORE_DOCS) {
+              break;
+            }
+            reader.deleteDocument(docID);
+            // TODO: we could/should change
+            // reader.deleteDocument to return boolean
+            // true if it did in fact delete, because here
+            // we could be deleting an already-deleted doc
+            // which makes this an upper bound:
+            delCount++;
+          }
+        }
+      }
+    }
+
+    return delCount;
+  }
+
+  public static class QueryAndLimit {
+    public final Query query;
+    public final int limit;
+    public QueryAndLimit(Query query, int limit) {
+      this.query = query;
+      this.limit = limit;
+    }
+  }
+
+  // Delete by query
+  private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
+    long delCount = 0;
+    IndexSearcher searcher = new IndexSearcher(reader);
+    assert searcher.getTopReaderContext().isAtomic;
+    final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
+    try {
+      for (QueryAndLimit ent : queriesIter) {
+        Query query = ent.query;
+        int limit = ent.limit;
+        Weight weight = query.weight(searcher);
+        Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
+        if (scorer != null) {
+          while(true)  {
+            int doc = scorer.nextDoc();
+            if (doc >= limit)
+              break;
+
+            reader.deleteDocument(doc);
+            // TODO: we could/should change
+            // reader.deleteDocument to return boolean
+            // true if it did in fact delete, because here
+            // we could be deleting an already-deleted doc
+            // which makes this an upper bound:
+            delCount++;
+          }
+        }
+      }
+    } finally {
+      searcher.close();
+    }
+
+    return delCount;
+  }
+
+  // used only by assert
+  private boolean checkDeleteTerm(Term term) {
+    if (term != null) {
+      assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
+    }
+    lastDeleteTerm = term;
+    return true;
+  }
+
+  // only for assert
+  private boolean checkDeleteStats() {
+    int numTerms2 = 0;
+    long bytesUsed2 = 0;
+    for(FrozenBufferedDeletes packet : deletes) {
+      numTerms2 += packet.numTermDeletes;
+      bytesUsed2 += packet.bytesUsed;
+    }
+    assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+    assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+    return true;
+  }
+}

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Tue Feb 22 01:00:39 2011
@@ -145,7 +145,7 @@ public class ConcurrentMergeScheduler ex
   /**
    * Called whenever the running merges have changed, to pause & unpause
    * threads. This method sorts the merge threads by their merge size in
-   * descending order and then pauses/unpauses threads from first to lsat --
+   * descending order and then pauses/unpauses threads from first to last --
    * that way, smaller merges are guaranteed to run before larger ones.
    */
   protected synchronized void updateMergeThreads() {
@@ -308,10 +308,31 @@ public class ConcurrentMergeScheduler ex
     // pending merges, until it's empty:
     while (true) {
 
+      synchronized(this) {
+        long startStallTime = 0;
+        while (mergeThreadCount() >= 1+maxMergeCount) {
+          startStallTime = System.currentTimeMillis();
+          if (verbose()) {
+            message("    too many merges; stalling...");
+          }
+          try {
+            wait();
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+        }
+
+        if (verbose()) {
+          if (startStallTime != 0) {
+            message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+          }
+        }
+      }
+
+
       // TODO: we could be careful about which merges to do in
       // the BG (eg maybe the "biggest" ones) vs FG, which
       // merges to do first (the easiest ones?), etc.
-
       MergePolicy.OneMerge merge = writer.getNextMerge();
       if (merge == null) {
         if (verbose())
@@ -326,32 +347,11 @@ public class ConcurrentMergeScheduler ex
       boolean success = false;
       try {
         synchronized(this) {
-          final MergeThread merger;
-          long startStallTime = 0;
-          while (mergeThreadCount() >= maxMergeCount) {
-            startStallTime = System.currentTimeMillis();
-            if (verbose()) {
-              message("    too many merges; stalling...");
-            }
-            try {
-              wait();
-            } catch (InterruptedException ie) {
-              throw new ThreadInterruptedException(ie);
-            }
-          }
-
-          if (verbose()) {
-            if (startStallTime != 0) {
-              message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
-            }
-            message("  consider merge " + merge.segString(dir));
-          }
-
-          assert mergeThreadCount() < maxMergeCount;
+          message("  consider merge " + merge.segString(dir));
 
           // OK to spawn a new merge thread to handle this
           // merge:
-          merger = getMergeThread(writer, merge);
+          final MergeThread merger = getMergeThread(writer, merge);
           mergeThreads.add(merger);
           if (verbose()) {
             message("    launch new thread [" + merger.getName() + "]");

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java Tue Feb 22 01:00:39 2011
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.FieldSelector;
@@ -36,8 +37,7 @@ import org.apache.lucene.store.LockObtai
 import org.apache.lucene.index.codecs.CodecProvider;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
+import org.apache.lucene.util.MapBackedSet;
 
 /** 
  * An IndexReader which reads indexes with multiple segments.
@@ -70,6 +70,8 @@ class DirectoryReader extends IndexReade
   // opened on a past IndexCommit:
   private long maxIndexVersion;
 
+  private final boolean applyAllDeletes;
+
 //  static IndexReader open(final Directory directory, final IndexDeletionPolicy deletionPolicy, final IndexCommit commit, final boolean readOnly,
 //      final int termInfosIndexDivisor) throws CorruptIndexException, IOException {
 //    return open(directory, deletionPolicy, commit, readOnly, termInfosIndexDivisor, null);
@@ -106,6 +108,8 @@ class DirectoryReader extends IndexReade
     } else {
       this.codecs = codecs;
     }
+    readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
+    applyAllDeletes = false;
 
     // To reduce the chance of hitting FileNotFound
     // (and having to retry), we open segments in
@@ -117,6 +121,7 @@ class DirectoryReader extends IndexReade
       boolean success = false;
       try {
         readers[i] = SegmentReader.get(readOnly, sis.info(i), termInfosIndexDivisor);
+        readers[i].readerFinishedListeners = readerFinishedListeners;
         success = true;
       } finally {
         if (!success) {
@@ -136,37 +141,50 @@ class DirectoryReader extends IndexReade
   }
 
   // Used by near real-time search
-  DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
+  DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
     this.directory = writer.getDirectory();
     this.readOnly = true;
-    segmentInfos = (SegmentInfos) infos.clone();// make sure we clone otherwise we share mutable state with IW
+    this.applyAllDeletes = applyAllDeletes;       // saved for reopen
+
     this.termInfosIndexDivisor = termInfosIndexDivisor;
     if (codecs == null) {
       this.codecs = CodecProvider.getDefault();
     } else {
       this.codecs = codecs;
     }
+    readerFinishedListeners = writer.getReaderFinishedListeners();
 
     // IndexWriter synchronizes externally before calling
     // us, which ensures infos will not change; so there's
     // no need to process segments in reverse order
     final int numSegments = infos.size();
-    SegmentReader[] readers = new SegmentReader[numSegments];
+
+    List<SegmentReader> readers = new ArrayList<SegmentReader>();
     final Directory dir = writer.getDirectory();
 
+    segmentInfos = (SegmentInfos) infos.clone();
+    int infosUpto = 0;
     for (int i=0;i<numSegments;i++) {
       boolean success = false;
       try {
         final SegmentInfo info = infos.info(i);
         assert info.dir == dir;
-        readers[i] = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
+        final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
+        if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
+          reader.readerFinishedListeners = readerFinishedListeners;
+          readers.add(reader);
+          infosUpto++;
+        } else {
+          reader.close();
+          segmentInfos.remove(infosUpto);
+        }
         success = true;
       } finally {
         if (!success) {
           // Close all readers we had opened:
-          for(i--;i>=0;i--) {
+          for(SegmentReader reader : readers) {
             try {
-              readers[i].close();
+              reader.close();
             } catch (Throwable ignore) {
               // keep going - we want to clean up as much as possible
             }
@@ -177,16 +195,20 @@ class DirectoryReader extends IndexReade
 
     this.writer = writer;
 
-    initialize(readers);
+    initialize(readers.toArray(new SegmentReader[readers.size()]));
   }
 
   /** This constructor is only used for {@link #reopen()} */
   DirectoryReader(Directory directory, SegmentInfos infos, SegmentReader[] oldReaders, int[] oldStarts,
-                  boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
+                  boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs,
+                  Collection<ReaderFinishedListener> readerFinishedListeners) throws IOException {
     this.directory = directory;
     this.readOnly = readOnly;
     this.segmentInfos = infos;
     this.termInfosIndexDivisor = termInfosIndexDivisor;
+    this.readerFinishedListeners = readerFinishedListeners;
+    applyAllDeletes = false;
+
     if (codecs == null) {
       this.codecs = CodecProvider.getDefault();
     } else {
@@ -232,8 +254,10 @@ class DirectoryReader extends IndexReade
 
           // this is a new reader; in case we hit an exception we can close it safely
           newReader = SegmentReader.get(readOnly, infos.info(i), termInfosIndexDivisor);
+          newReader.readerFinishedListeners = readerFinishedListeners;
         } else {
           newReader = newReaders[i].reopenSegment(infos.info(i), doClone, readOnly);
+          assert newReader.readerFinishedListeners == readerFinishedListeners;
         }
         if (newReader == newReaders[i]) {
           // this reader will be shared between the old and the new one,
@@ -357,6 +381,7 @@ class DirectoryReader extends IndexReade
       writeLock = null;
       hasChanges = false;
     }
+    assert newReader.readerFinishedListeners != null;
 
     return newReader;
   }
@@ -391,7 +416,9 @@ class DirectoryReader extends IndexReade
     // TODO: right now we *always* make a new reader; in
     // the future we could have write make some effort to
     // detect that no changes have occurred
-    return writer.getReader();
+    IndexReader reader = writer.getReader(applyAllDeletes);
+    reader.readerFinishedListeners = readerFinishedListeners;
+    return reader;
   }
 
   private IndexReader doReopen(final boolean openReadOnly, IndexCommit commit) throws CorruptIndexException, IOException {
@@ -458,7 +485,7 @@ class DirectoryReader extends IndexReade
 
   private synchronized DirectoryReader doReopen(SegmentInfos infos, boolean doClone, boolean openReadOnly) throws CorruptIndexException, IOException {
     DirectoryReader reader;
-    reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs);
+    reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs, readerFinishedListeners);
     return reader;
   }
 
@@ -705,11 +732,18 @@ class DirectoryReader extends IndexReade
       // case we have to roll back:
       startCommit();
 
+      final SegmentInfos rollbackSegmentInfos = new SegmentInfos();
+      rollbackSegmentInfos.addAll(segmentInfos);
+
       boolean success = false;
       try {
         for (int i = 0; i < subReaders.length; i++)
           subReaders[i].commit();
 
+        // Remove segments that contain only 100% deleted
+        // docs:
+        segmentInfos.pruneDeletedSegments();
+
         // Sync all files we just wrote
         directory.sync(segmentInfos.files(directory, false));
         segmentInfos.commit(directory);
@@ -729,6 +763,10 @@ class DirectoryReader extends IndexReade
           // partially written .del files, etc, are
           // removed):
           deleter.refresh();
+
+          // Restore all SegmentInfos (in case we pruned some)
+          segmentInfos.clear();
+          segmentInfos.addAll(rollbackSegmentInfos);
         }
       }
 
@@ -805,11 +843,6 @@ class DirectoryReader extends IndexReade
       }
     }
 
-    // NOTE: only needed in case someone had asked for
-    // FieldCache for top-level reader (which is generally
-    // not a good idea):
-    FieldCache.DEFAULT.purge(this);
-
     if (writer != null) {
       // Since we just closed, writer may now be able to
       // delete unused files:

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Tue Feb 22 01:00:39 2011
@@ -20,9 +20,10 @@ package org.apache.lucene.index;
 import java.io.IOException;
 
 abstract class DocConsumer {
-  abstract void processDocument() throws IOException;
+  abstract void processDocument(FieldInfos fieldInfos) throws IOException;
   abstract void finishDocument() throws IOException;
   abstract void flush(final SegmentWriteState state) throws IOException;
   abstract void abort();
   abstract boolean freeRAM();
+  abstract void doAfterFlush();
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Tue Feb 22 01:00:39 2011
@@ -21,9 +21,6 @@ import java.io.IOException;
 import java.util.Map;
 
 abstract class DocFieldConsumer {
-
-  FieldInfos fieldInfos;
-
   /** Called when DocumentsWriter decides to create a new
    *  segment */
   abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
@@ -42,7 +39,4 @@ abstract class DocFieldConsumer {
 
   abstract void finishDocument() throws IOException;
 
-  void setFieldInfos(FieldInfos fieldInfos) {
-    this.fieldInfos = fieldInfos;
-  }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Tue Feb 22 01:00:39 2011
@@ -36,13 +36,6 @@ final class DocFieldConsumers extends Do
   }
 
   @Override
-  void setFieldInfos(FieldInfos fieldInfos) {
-    super.setFieldInfos(fieldInfos);
-    one.setFieldInfos(fieldInfos);
-    two.setFieldInfos(fieldInfos);
-  }
-
-  @Override
   public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
     Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
@@ -79,16 +72,16 @@ final class DocFieldConsumers extends Do
     try {
       one.finishDocument();
     } finally {
-      two.finishDocument();  
+      two.finishDocument();
     }
   }
-  
+
   @Override
   public void startDocument() throws IOException {
     one.startDocument();
     two.startDocument();
   }
-  
+
   @Override
   public DocFieldConsumerPerField addField(FieldInfo fi) {
     return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Tue Feb 22 01:00:39 2011
@@ -38,7 +38,6 @@ import org.apache.lucene.document.Fielda
 
 final class DocFieldProcessor extends DocConsumer {
 
-  final FieldInfos fieldInfos;
   final DocFieldConsumer consumer;
   final StoredFieldsWriter fieldsWriter;
 
@@ -60,9 +59,7 @@ final class DocFieldProcessor extends Do
   public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
     this.docState = docWriter.docState;
     this.consumer = consumer;
-    fieldInfos = docWriter.getFieldInfos();
-    consumer.setFieldInfos(fieldInfos);
-    fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
+    fieldsWriter = new StoredFieldsWriter(docWriter);
   }
 
   @Override
@@ -73,7 +70,6 @@ final class DocFieldProcessor extends Do
     for (DocFieldConsumerPerField f : fields) {
       childFields.put(f.getFieldInfo(), f);
     }
-    trimFields(state);
 
     fieldsWriter.flush(state);
     consumer.flush(childFields, state);
@@ -83,7 +79,7 @@ final class DocFieldProcessor extends Do
     // FreqProxTermsWriter does this with
     // FieldInfo.storePayload.
     final String fileName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.FIELD_INFOS_EXTENSION);
-    fieldInfos.write(state.directory, fileName);
+    state.fieldInfos.write(state.directory, fileName);
   }
 
   @Override
@@ -122,43 +118,13 @@ final class DocFieldProcessor extends Do
     return fields;
   }
 
-  /** If there are fields we've seen but did not see again
-   *  in the last run, then free them up. */
-
-  void trimFields(SegmentWriteState state) {
-
-    for(int i=0;i<fieldHash.length;i++) {
-      DocFieldProcessorPerField perField = fieldHash[i];
-      DocFieldProcessorPerField lastPerField = null;
-
-      while (perField != null) {
-
-        if (perField.lastGen == -1) {
-
-          // This field was not seen since the previous
-          // flush, so, free up its resources now
-
-          // Unhash
-          if (lastPerField == null)
-            fieldHash[i] = perField.next;
-          else
-            lastPerField.next = perField.next;
-
-          if (state.infoStream != null) {
-            state.infoStream.println("  purge field=" + perField.fieldInfo.name);
-          }
-
-          totalFieldCount--;
-
-        } else {
-          // Reset
-          perField.lastGen = -1;
-          lastPerField = perField;
-        }
-
-        perField = perField.next;
-      }
-    }
+  /** In flush we reset the fieldHash to not maintain per-field state
+   *  across segments */
+  @Override
+  void doAfterFlush() {
+    fieldHash = new DocFieldProcessorPerField[2];
+    hashMask = 1;
+    totalFieldCount = 0;
   }
 
   private void rehash() {
@@ -185,7 +151,7 @@ final class DocFieldProcessor extends Do
   }
 
   @Override
-  public void processDocument() throws IOException {
+  public void processDocument(FieldInfos fieldInfos) throws IOException {
 
     consumer.startDocument();
     fieldsWriter.startDocument();

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Tue Feb 22 01:00:39 2011
@@ -66,13 +66,6 @@ final class DocInverter extends DocField
   }
 
   @Override
-  void setFieldInfos(FieldInfos fieldInfos) {
-    super.setFieldInfos(fieldInfos);
-    consumer.setFieldInfos(fieldInfos);
-    endConsumer.setFieldInfos(fieldInfos);
-  }
-
-  @Override
   void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
     Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Feb 22 01:00:39 2011
@@ -33,6 +33,7 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BitVector;
 
 /**
  * This class accepts multiple added documents and directly
@@ -126,24 +127,21 @@ final class DocumentsWriter {
   // non-zero we will flush by RAM usage instead.
   private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
 
-  private final FieldInfos fieldInfos;
-
-  final BufferedDeletes bufferedDeletes;
-  SegmentDeletes pendingDeletes;
+  final BufferedDeletesStream bufferedDeletesStream;
+  // TODO: cutover to BytesRefHash
+  private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
   final IndexingChain chain;
 
   final DocumentsWriterPerThreadPool perThreadPool;
 
-  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
+  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
     this.indexWriter = writer;
     this.similarityProvider = writer.getConfig().getSimilarityProvider();
-    this.fieldInfos = fieldInfos;
-    this.bufferedDeletes = bufferedDeletes;
+    this.bufferedDeletesStream = bufferedDeletesStream;
     this.perThreadPool = indexerThreadPool;
-    this.pendingDeletes = new SegmentDeletes();
     this.chain = chain;
-    this.perThreadPool.initialize(this);
+    this.perThreadPool.initialize(this, fieldInfos);
   }
 
   boolean deleteQueries(final Query... queries) throws IOException {
@@ -164,7 +162,7 @@ final class DocumentsWriter {
     if (!deleted) {
       synchronized(this) {
         for (Query query : queries) {
-          pendingDeletes.addQuery(query, SegmentDeletes.MAX_INT);
+          pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
         }
       }
     }
@@ -194,7 +192,7 @@ final class DocumentsWriter {
     if (!deleted) {
       synchronized(this) {
         for (Term term : terms) {
-          pendingDeletes.addTerm(term, SegmentDeletes.MAX_INT);
+          pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
         }
       }
     }
@@ -202,6 +200,9 @@ final class DocumentsWriter {
     return false;
   }
 
+  // TODO: we could check w/ FreqProxTermsWriter: if the
+  // term doesn't exist, don't bother buffering into the
+  // per-DWPT map (but still must go into the global map)
   boolean deleteTerm(final Term term) throws IOException {
     return deleteTerms(term);
   }
@@ -226,16 +227,6 @@ final class DocumentsWriter {
     return deleted;
   }
 
-  public FieldInfos getFieldInfos() {
-    return fieldInfos;
-  }
-
-  /** Returns true if any of the fields in the current
-   *  buffered docs have omitTermFreqAndPositions==false */
-  boolean hasProx() {
-    return fieldInfos.hasProx();
-  }
-
   /** If non-null, various details of indexing are printed
    *  here. */
   synchronized void setInfoStream(PrintStream infoStream) {
@@ -396,7 +387,8 @@ final class DocumentsWriter {
     ensureOpen();
 
     SegmentInfo newSegment = null;
-    SegmentDeletes segmentDeletes = null;
+    BufferedDeletes segmentDeletes = null;
+    BitVector deletedDocs = null;
 
     ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
     try {
@@ -407,10 +399,10 @@ final class DocumentsWriter {
 
       newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
       if (newSegment != null) {
-        fieldInfos.update(dwpt.getFieldInfos());
+        deletedDocs = dwpt.flushState.deletedDocs;
         if (dwpt.pendingDeletes.any()) {
           segmentDeletes = dwpt.pendingDeletes;
-          dwpt.pendingDeletes = new SegmentDeletes();
+          dwpt.pendingDeletes = new BufferedDeletes(false);
         }
       }
     } finally {
@@ -423,7 +415,7 @@ final class DocumentsWriter {
 
     if (newSegment != null) {
       perThreadPool.clearThreadBindings(perThread);
-      indexWriter.addFlushedSegment(newSegment);
+      indexWriter.addFlushedSegment(newSegment, deletedDocs);
       return true;
     }
 
@@ -460,17 +452,23 @@ final class DocumentsWriter {
     }
   }
 
-  private final void pushDeletes(SegmentInfo segmentInfo, SegmentDeletes segmentDeletes) {
+  private final void pushDeletes(SegmentInfo segmentInfo, BufferedDeletes segmentDeletes) {
     synchronized(indexWriter) {
       // Lock order: DW -> BD
+      final long delGen = bufferedDeletesStream.getNextGen();
       if (segmentDeletes.any()) {
-        if (segmentInfo != null) {
-          bufferedDeletes.pushDeletes(segmentDeletes, segmentInfo);
-        } else if (indexWriter.segmentInfos.size() > 0) {
+        if (indexWriter.segmentInfos.size() > 0 || segmentInfo != null) {
+          final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(segmentDeletes, delGen);
+          if (infoStream != null) {
+            message("flush: push buffered deletes");
+          }
+          bufferedDeletesStream.push(packet);
           if (infoStream != null) {
-            message("flush: push buffered deletes to previously flushed segment " + indexWriter.segmentInfos.lastElement());
+            message("flush: delGen=" + packet.gen);
+          }
+          if (segmentInfo != null) {
+            segmentInfo.setBufferedDeletesGen(packet.gen);
           }
-          bufferedDeletes.pushDeletes(segmentDeletes, indexWriter.segmentInfos.lastElement(), true);
         } else {
           if (infoStream != null) {
             message("flush: drop buffered deletes: no segments");
@@ -479,6 +477,8 @@ final class DocumentsWriter {
           // there are no segments, the deletions cannot
           // affect anything.
         }
+      } else if (segmentInfo != null) {
+        segmentInfo.setBufferedDeletesGen(delGen);
       }
     }
   }
@@ -489,7 +489,7 @@ final class DocumentsWriter {
     if (flushDeletes) {
       synchronized (this) {
         pushDeletes(null, pendingDeletes);
-        pendingDeletes = new SegmentDeletes();
+        pendingDeletes = new BufferedDeletes(false);
       }
     }
 
@@ -498,7 +498,8 @@ final class DocumentsWriter {
 
     while (threadsIterator.hasNext()) {
       SegmentInfo newSegment = null;
-      SegmentDeletes segmentDeletes = null;
+      BufferedDeletes segmentDeletes = null;
+      BitVector deletedDocs = null;
 
       ThreadState perThread = threadsIterator.next();
       perThread.lock();
@@ -519,17 +520,17 @@ final class DocumentsWriter {
           newSegment = dwpt.flush();
 
           if (newSegment != null) {
-            fieldInfos.update(dwpt.getFieldInfos());
             anythingFlushed = true;
+            deletedDocs = dwpt.flushState.deletedDocs;
             perThreadPool.clearThreadBindings(perThread);
             if (dwpt.pendingDeletes.any()) {
               segmentDeletes = dwpt.pendingDeletes;
-              dwpt.pendingDeletes = new SegmentDeletes();
+              dwpt.pendingDeletes = new BufferedDeletes(false);
             }
           }
         } else if (flushDeletes && dwpt.pendingDeletes.any()) {
           segmentDeletes = dwpt.pendingDeletes;
-          dwpt.pendingDeletes = new SegmentDeletes();
+          dwpt.pendingDeletes = new BufferedDeletes(false);
         }
       } finally {
         perThread.unlock();
@@ -543,7 +544,7 @@ final class DocumentsWriter {
       if (newSegment != null) {
         // important do unlock the perThread before finishFlushedSegment
         // is called to prevent deadlock on IndexWriter mutex
-        indexWriter.addFlushedSegment(newSegment);
+        indexWriter.addFlushedSegment(newSegment, deletedDocs);
       }
     }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Feb 22 01:00:39 2011
@@ -30,6 +30,7 @@ import org.apache.lucene.document.Docume
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BitVector;
 import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
 import org.apache.lucene.util.RamUsageEstimator;
 
@@ -81,7 +82,7 @@ public class DocumentsWriterPerThread {
   };
 
   // Deletes for our still-in-RAM (to be flushed next) segment
-  SegmentDeletes pendingDeletes = new SegmentDeletes();
+  BufferedDeletes pendingDeletes = new BufferedDeletes(false);
 
   static class DocState {
     final DocumentsWriterPerThread docWriter;
@@ -147,13 +148,13 @@ public class DocumentsWriterPerThread {
 
   final AtomicLong bytesUsed = new AtomicLong(0);
 
-  private final FieldInfos fieldInfos;
+  private FieldInfos fieldInfos;
 
-  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, FieldInfos fieldInfos, IndexingChain indexingChain) {
     this.directory = directory;
     this.parent = parent;
+    this.fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
     this.writer = parent.indexWriter;
-    this.fieldInfos = new FieldInfos();
     this.infoStream = parent.indexWriter.getInfoStream();
     this.docState = new DocState(this);
     this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
@@ -185,7 +186,7 @@ public class DocumentsWriterPerThread {
 
     boolean success = false;
     try {
-      consumer.processDocument();
+      consumer.processDocument(fieldInfos);
 
       success = true;
     } finally {
@@ -244,13 +245,6 @@ public class DocumentsWriterPerThread {
     return numDocsInRAM;
   }
 
-  /** Returns true if any of the fields in the current
-  *  buffered docs have omitTermFreqAndPositions==false */
-  boolean hasProx() {
-    return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
-                                      : true;
-  }
-
   SegmentCodecs getCodec() {
     return flushState.segmentCodecs;
   }
@@ -258,6 +252,8 @@ public class DocumentsWriterPerThread {
   /** Reset after a flush */
   private void doAfterFlush() throws IOException {
     segment = null;
+    consumer.doAfterFlush();
+    fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
     parent.substractFlushedNumDocs(numDocsInRAM);
     numDocsInRAM = 0;
   }
@@ -268,7 +264,19 @@ public class DocumentsWriterPerThread {
 
     flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
         numDocsInRAM, writer.getConfig().getTermIndexInterval(),
-        SegmentCodecs.build(fieldInfos, writer.codecs));
+        SegmentCodecs.build(fieldInfos, writer.codecs), pendingDeletes);
+
+    // Apply delete-by-docID now (delete-byDocID only
+    // happens when an exception is hit processing that
+    // doc, eg if analyzer has some problem w/ the text):
+    if (pendingDeletes.docIDs.size() > 0) {
+      flushState.deletedDocs = new BitVector(numDocsInRAM);
+      for(int delDocID : pendingDeletes.docIDs) {
+        flushState.deletedDocs.set(delDocID);
+      }
+      pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
+      pendingDeletes.docIDs.clear();
+    }
 
     if (infoStream != null) {
       message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
@@ -285,11 +293,12 @@ public class DocumentsWriterPerThread {
 
     try {
 
-      SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false);
+      SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos);
       consumer.flush(flushState);
-      newSegment.setHasVectors(flushState.hasVectors);
+      newSegment.clearFilesCache();
 
       if (infoStream != null) {
+        message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
         message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
         message("flushedFiles=" + newSegment.files());
         message("flushed codecs=" + newSegment.getSegmentCodecs());

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue Feb 22 01:00:39 2011
@@ -24,9 +24,9 @@ public abstract class DocumentsWriterPer
     numThreadStatesActive = 0;
   }
 
-  public void initialize(DocumentsWriter documentsWriter) {
+  public void initialize(DocumentsWriter documentsWriter, FieldInfos fieldInfos) {
     for (int i = 0; i < perThreads.length; i++) {
-      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, documentsWriter.chain));
+      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, fieldInfos, documentsWriter.chain));
     }
   }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java Tue Feb 22 01:00:39 2011
@@ -32,7 +32,7 @@ public final class FieldInfo {
   public boolean omitTermFreqAndPositions;
 
   public boolean storePayloads; // whether this field stores payloads together with term positions
-  int codecId = 0; // set inside SegmentCodecs#build() during segment flush - this is used to identify the codec used to write this field 
+  private int codecId = -1; // set inside SegmentCodecs#build() during segment flush - this is used to identify the codec used to write this field
 
   FieldInfo(String na, boolean tk, int nu, boolean storeTermVector, 
             boolean storePositionWithTermVector,  boolean storeOffsetWithTermVector, 
@@ -57,10 +57,21 @@ public final class FieldInfo {
     }
   }
 
+  public void setCodecId(int codecId) {
+    assert this.codecId == -1 : "CodecId can only be set once.";
+    this.codecId = codecId;
+  }
+
+  public int getCodecId() {
+    return codecId;
+  }
+
   @Override
   public Object clone() {
-    return new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
+    FieldInfo clone = new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
                          storeOffsetWithTermVector, omitNorms, storePayloads, omitTermFreqAndPositions);
+    clone.codecId = this.codecId;
+    return clone;
   }
 
   void update(boolean isIndexed, boolean storeTermVector, boolean storePositionWithTermVector,