You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/03/12 13:23:15 UTC

svn commit: r1299648 - in /lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index: BufferedDeletesStream.java IndexFileDeleter.java IndexWriter.java ReadersAndLiveDocs.java StandardDirectoryReader.java

Author: mikemccand
Date: Mon Mar 12 12:23:14 2012
New Revision: 1299648

URL: http://svn.apache.org/viewvc?rev=1299648&view=rev
Log:
LUCENE-3855: fix thread safety issues in IW's livedocs/reader pool

Added:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java   (with props)
Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java Mon Mar 12 12:23:14 2012
@@ -210,7 +210,7 @@ class BufferedDeletesStream {
 
         // Lock order: IW -> BD -> RP
         assert readerPool.infoIsLive(info);
-        final IndexWriter.ReadersAndLiveDocs rld = readerPool.get(info, true);
+        final ReadersAndLiveDocs rld = readerPool.get(info, true);
         final SegmentReader reader = rld.getReader(IOContext.READ);
         int delCount = 0;
         final boolean segAllDeletes;
@@ -224,11 +224,12 @@ class BufferedDeletesStream {
           // Don't delete by Term here; DocumentsWriterPerThread
           // already did that on flush:
           delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader);
-          final int fullDelCount = rld.info.getDelCount() + rld.pendingDeleteCount;
+          final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
           assert fullDelCount <= rld.info.docCount;
           segAllDeletes = fullDelCount == rld.info.docCount;
         } finally {
-          readerPool.release(reader, false);
+          rld.release(reader);
+          readerPool.release(rld);
         }
         anyNewDeletes |= delCount > 0;
 
@@ -262,18 +263,19 @@ class BufferedDeletesStream {
         if (coalescedDeletes != null) {
           // Lock order: IW -> BD -> RP
           assert readerPool.infoIsLive(info);
-          final IndexWriter.ReadersAndLiveDocs rld = readerPool.get(info, true);
+          final ReadersAndLiveDocs rld = readerPool.get(info, true);
           final SegmentReader reader = rld.getReader(IOContext.READ);
           int delCount = 0;
           final boolean segAllDeletes;
           try {
             delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
-            final int fullDelCount = rld.info.getDelCount() + rld.pendingDeleteCount;
+            final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
             assert fullDelCount <= rld.info.docCount;
             segAllDeletes = fullDelCount == rld.info.docCount;
-          } finally {
-            readerPool.release(reader, false);
+          } finally {   
+            rld.release(reader);
+            readerPool.release(rld);
           }
           anyNewDeletes |= delCount > 0;
 
@@ -353,7 +355,7 @@ class BufferedDeletesStream {
   }
 
   // Delete by Term
-  private synchronized long applyTermDeletes(Iterable<Term> termsIter, IndexWriter.ReadersAndLiveDocs rld, SegmentReader reader) throws IOException {
+  private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException {
     long delCount = 0;
     Fields fields = reader.fields();
     if (fields == null) {
@@ -394,7 +396,7 @@ class BufferedDeletesStream {
       // System.out.println("  term=" + term);
 
       if (termsEnum.seekExact(term.bytes(), false)) {
-        DocsEnum docsEnum = termsEnum.docs(rld.liveDocs, docs, false);
+        DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, false);
         //System.out.println("BDS: got docsEnum=" + docsEnum);
 
         if (docsEnum != null) {
@@ -434,7 +436,7 @@ class BufferedDeletesStream {
   }
 
   // Delete by query
-  private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, IndexWriter.ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException {
+  private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException {
     long delCount = 0;
     final AtomicReaderContext readerContext = reader.getTopReaderContext();
     boolean any = false;

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java Mon Mar 12 12:23:14 2012
@@ -453,7 +453,7 @@ final class IndexFileDeleter {
     assert Thread.holdsLock(writer);
 
     if (infoStream.isEnabled("IFD")) {
-      infoStream.message("IFD", "now checkpoint \"" + writer.segString(segmentInfos) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+      infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
     }
 
     // Try again now to delete any previously un-deletable

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Mon Mar 12 12:23:14 2012
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
-import org.apache.lucene.codecs.LiveDocsFormat;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@@ -392,260 +391,6 @@ public class IndexWriter implements Clos
     return r;
   }
 
-  // This class inherits all sync from IW:
-  class ReadersAndLiveDocs {
-    // Not final because we replace (clone) when we need to
-    // change it and it's been shared:
-    public final SegmentInfo info;
-
-    // Set once (null, and then maybe set, and never set again):
-    private SegmentReader reader;
-
-    // TODO: it's sometimes wasteful that we hold open two
-    // separate SRs (one for merging one for
-    // reading)... maybe just use a single SR?  The gains of
-    // not loading the terms index (for merging in the
-    // non-NRT case) are far less now... and if the app has
-    // any deletes it'll open real readers anyway.
-
-    // Set once (null, and then maybe set, and never set again):
-    private SegmentReader mergeReader;
-
-    // Holds the current shared (readable and writable
-    // liveDocs).  This is null when there are no deleted
-    // docs, and it's copy-on-write (cloned whenever we need
-    // to change it but it's been shared to an external NRT
-    // reader).
-    public Bits liveDocs;
-
-    // How many further deletions we've done against
-    // liveDocs vs when we loaded it or last wrote it:
-    public int pendingDeleteCount;
-
-    // True if the current liveDocs is referenced by an
-    // external NRT reader:
-    public boolean shared;
-
-    public ReadersAndLiveDocs(SegmentInfo info) {
-      this.info = info;
-      shared = true;
-    }
-
-    // Returns false if we are the only remaining refs of
-    // this reader:
-    public synchronized boolean anyOutsideRefs(SegmentReader sr) {
-      int myRefCounts = 0;
-      if (sr == reader) {
-        myRefCounts++;
-      }
-      if (sr == mergeReader) {
-        myRefCounts++;
-      }
-      final int rc = sr.getRefCount();
-      assert rc >= myRefCounts;
-      return rc > myRefCounts;
-    }
-
-    // Call only from assert!
-    public synchronized boolean verifyDocCounts() {
-      int count;
-      if (liveDocs != null) {
-        count = 0;
-        for(int docID=0;docID<info.docCount;docID++) {
-          if (liveDocs.get(docID)) {
-            count++;
-          }
-        }
-      } else {
-        count = info.docCount;
-      }
-
-      assert info.docCount - info.getDelCount() - pendingDeleteCount == count: "info.docCount=" + info.docCount + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;;
-      return true;
-    }
-
-    // Returns true if any reader remains
-    public synchronized boolean removeReader(SegmentReader sr, boolean drop) throws IOException {
-      if (sr == reader) {
-        //System.out.println(" non-merge reader");
-        reader.decRef();
-        reader = null;
-      }
-        
-      if (sr == mergeReader) {
-        //System.out.println(" merge reader");
-        mergeReader.decRef();
-        mergeReader = null;
-        if (drop && reader != null) {
-          //System.out.println(" also release normal reader rc=" + rld.reader.getRefCount());
-          reader.decRef();
-          reader = null;
-        }
-      }
-
-      return reader != null || mergeReader != null;
-    }
-
-    // Get reader for searching/deleting
-    public synchronized SegmentReader getReader(IOContext context) throws IOException {
-      //System.out.println("  livedocs=" + rld.liveDocs);
-
-      if (reader == null) {
-        reader = new SegmentReader(info, config.getReaderTermsIndexDivisor(), context);
-        if (liveDocs == null) {
-          liveDocs = reader.getLiveDocs();
-        }
-        //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool");
-      }
-
-      // Ref for caller
-      reader.incRef();
-      return reader;
-    }
-
-    // Get reader for merging (does not load the terms
-    // index):
-    public synchronized SegmentReader getMergeReader(IOContext context) throws IOException {
-      //System.out.println("  livedocs=" + rld.liveDocs);
-
-      if (mergeReader == null) {
-
-        if (reader != null) {
-          // Just use the already opened non-merge reader
-          // for merging.  In the NRT case this saves us
-          // pointless double-open:
-          //System.out.println("PROMOTE non-merge reader seg=" + rld.info);
-          reader.incRef();
-          mergeReader = reader;
-        } else {
-          mergeReader = new SegmentReader(info, -1, context);
-          if (liveDocs == null) {
-            liveDocs = mergeReader.getLiveDocs();
-          }
-        }
-      }
-
-      // Ref for caller
-      mergeReader.incRef();
-      return mergeReader;
-    }
-
-    public synchronized boolean delete(int docID) {
-      assert liveDocs != null;
-      assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + ",liveDocsLength=" + liveDocs.length();
-      assert !shared;
-      final boolean didDelete = liveDocs.get(docID);
-      if (didDelete) {
-       ((MutableBits) liveDocs).clear(docID);
-        pendingDeleteCount++;
-        //System.out.println("  new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.docCount-liveDocs.count()));
-      }
-      return didDelete;
-    }
-
-    public synchronized void dropReaders() throws IOException {
-      if (reader != null) {
-        //System.out.println("  pool.drop info=" + info + " rc=" + reader.getRefCount());
-        reader.decRef();
-        reader = null;
-      }
-      if (mergeReader != null) {
-        //System.out.println("  pool.drop info=" + info + " merge rc=" + mergeReader.getRefCount());
-        mergeReader.decRef();
-        mergeReader = null;
-      }
-    }
-
-    /**
-     * Returns a ref to a clone.  NOTE: this clone is not
-     * enrolled in the pool, so you should simply close()
-     * it when you're done (ie, do not call release()).
-     */
-    public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
-      if (reader == null) {
-        getReader(context).decRef();
-        assert reader != null;
-      }
-      shared = true;
-      if (liveDocs != null) {
-        return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.docCount - info.getDelCount() - pendingDeleteCount);
-      } else {
-        reader.incRef();
-        return reader;
-      }
-    }
-
-    public synchronized void initWritableLiveDocs() throws IOException {
-      assert Thread.holdsLock(IndexWriter.this);
-      assert info.docCount > 0;
-      //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
-      if (shared) {
-        // Copy on write: this means we've cloned a
-        // SegmentReader sharing the current liveDocs
-        // instance; must now make a private clone so we can
-        // change it:
-        LiveDocsFormat liveDocsFormat = info.getCodec().liveDocsFormat();
-        if (liveDocs == null) {
-          //System.out.println("create BV seg=" + info);
-          liveDocs = liveDocsFormat.newLiveDocs(info.docCount);
-        } else {
-          liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
-        }
-        shared = false;
-      } else {
-        assert liveDocs != null;
-      }
-    }
-
-    public synchronized Bits getReadOnlyLiveDocs() {
-      //System.out.println("getROLiveDocs seg=" + info);
-      assert Thread.holdsLock(IndexWriter.this);
-      shared = true;
-      //if (liveDocs != null) {
-      //System.out.println("  liveCount=" + liveDocs.count());
-      //}
-      return liveDocs;
-    }
-
-    // Commit live docs to the directory (writes new
-    // _X_N.del files); returns true if it wrote the file
-    // and false if there were no new deletes to write:
-    public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
-      //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount);
-      if (pendingDeleteCount != 0) {
-        // We have new deletes
-        assert liveDocs.length() == info.docCount;
-
-        // Save in case we need to rollback on failure:
-        final SegmentInfo sav = (SegmentInfo) info.clone();
-        info.advanceDelGen();
-        info.setDelCount(info.getDelCount() + pendingDeleteCount);
-
-        // We can write directly to the actual name (vs to a
-        // .tmp & renaming it) because the file is not live
-        // until segments file is written:
-        boolean success = false;
-        try {
-          info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, dir, info, IOContext.DEFAULT);
-          success = true;
-        } finally {
-          if (!success) {
-            info.reset(sav);
-          }
-        }
-        pendingDeleteCount = 0;
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "SegmentLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")";
-    }
-  }
-
   /** Holds shared SegmentReader instances. IndexWriter uses
    *  SegmentReaders for 1) applying deletes, 2) doing
    *  merges, 3) handing out a real-time reader.  This pool
@@ -665,44 +410,36 @@ public class IndexWriter implements Clos
       return true;
     }
 
-    /**
-     * Release the segment reader (i.e. decRef it and close if there
-     * are no more references).  If drop is true then we
-     * remove this entry from the pool.
-     * @param sr
-     * @throws IOException
-     */
-    public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
-      // Drop caller's ref; for an external reader (not
-      // pooled), this decRef will close it
-      //System.out.println("pool.release seg=" + sr.getSegmentInfo() + " rc=" + sr.getRefCount() + " drop=" + drop);
-      sr.decRef();
-
-      final ReadersAndLiveDocs rld = readerMap.get(sr.getSegmentInfo());
-
-      if (rld != null && (drop || (!poolReaders && !rld.anyOutsideRefs(sr)))) {
-
-        // Discard (don't save) changes when we are dropping
-        // the reader; this is used only on the sub-readers
-        // after a successful merge.  If deletes had
-        // accumulated on those sub-readers while the merge
-        // is running, by now we have carried forward those
-        // deletes onto the newly merged segment, so we can
-        // discard them on the sub-readers:
+    public synchronized void drop(SegmentInfo info) throws IOException {
+      final ReadersAndLiveDocs rld = readerMap.get(info);
+      if (rld != null) {
+        assert info == rld.info;
+        readerMap.remove(info);
+        rld.dropReaders();
+      }
+    }
 
-        if (!drop) {
-          if (rld.writeLiveDocs(directory)) {
-            assert infoIsLive(sr.getSegmentInfo());
-            // Must checkpoint w/ deleter, because we just
-            // created created new _X_N.del file.
-            deleter.checkpoint(segmentInfos, false);
-          }
-        }
+    public synchronized void release(ReadersAndLiveDocs rld) throws IOException {
 
-        if (!rld.removeReader(sr, drop)) {
-          //System.out.println("DROP seg=" + rld.info + " " + readerMap.size() + " in pool");
-          readerMap.remove(sr.getSegmentInfo());
+      // Matches incRef in get:
+      rld.decRef();
+
+      // Pool still holds a ref:
+      assert rld.refCount() >= 1;
+
+      if (!poolReaders && rld.refCount() == 1) {
+        // This is the last ref to this RLD, and we're not
+        // pooling, so remove it:
+        if (rld.writeLiveDocs(directory)) {
+          // Make sure we only write del docs for a live segment:
+          assert infoIsLive(rld.info);
+          // Must checkpoint w/ deleter, because we just
+          // created created new _X_N.del file.
+          deleter.checkpoint(segmentInfos, false);
         }
+
+        rld.dropReaders();
+        readerMap.remove(rld.info);
       }
     }
 
@@ -712,8 +449,8 @@ public class IndexWriter implements Clos
       final Iterator<Map.Entry<SegmentInfo,ReadersAndLiveDocs>> it = readerMap.entrySet().iterator();
       while(it.hasNext()) {
         final ReadersAndLiveDocs rld = it.next().getValue();
-        //System.out.println("pool.dropAll: seg=" + rld.info);
         if (doSave && rld.writeLiveDocs(directory)) {
+          // Make sure we only write del docs for a live segment:
           assert infoIsLive(rld.info);
           // Must checkpoint w/ deleter, because we just
           // created created new _X_N.del file.
@@ -735,13 +472,6 @@ public class IndexWriter implements Clos
       assert readerMap.size() == 0;
     }
 
-    public synchronized void drop(SegmentInfo info) throws IOException {
-      final ReadersAndLiveDocs rld = readerMap.remove(info);
-      if (rld != null) {
-        rld.dropReaders();
-      }
-    }
-
     /**
      * Commit live docs changes for the segment readers for
      * the provided infos.
@@ -751,19 +481,23 @@ public class IndexWriter implements Clos
     public synchronized void commit(SegmentInfos infos) throws IOException {
       for (SegmentInfo info : infos) {
         final ReadersAndLiveDocs rld = readerMap.get(info);
-        if (rld != null && rld.writeLiveDocs(directory)) {
-          assert infoIsLive(info);
-          // Must checkpoint w/ deleter, because we just
-          // created created new _X_N.del file.
-          deleter.checkpoint(segmentInfos, false);
+        if (rld != null) {
+          assert rld.info == info;
+          if (rld.writeLiveDocs(directory)) {
+            // Make sure we only write del docs for a live segment:
+            assert infoIsLive(info);
+            // Must checkpoint w/ deleter, because we just
+            // created created new _X_N.del file.
+            deleter.checkpoint(segmentInfos, false);
+          }
         }
       }
     }
 
     /**
      * Obtain a ReadersAndLiveDocs instance from the
-     * readerPool.  If getReader is true, you must later call
-     * {@link #release(SegmentReader)}.
+     * readerPool.  If create is true, you must later call
+     * {@link #release(ReadersAndLiveDocs)}.
      * @throws IOException
      */
     public synchronized ReadersAndLiveDocs get(SegmentInfo info, boolean create) {
@@ -771,15 +505,22 @@ public class IndexWriter implements Clos
       assert info.dir == directory;
 
       ReadersAndLiveDocs rld = readerMap.get(info);
-      //System.out.println("rld.get seg=" + info + " poolReaders=" + poolReaders);
       if (rld == null) {
-        //System.out.println("  new rld");
         if (!create) {
           return null;
         }
-        rld = new ReadersAndLiveDocs(info);
+        rld = new ReadersAndLiveDocs(IndexWriter.this, info);
+        // Steal initial reference:
         readerMap.put(info, rld);
+      } else {
+        assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + infoIsLive(rld.info) + " vs " + infoIsLive(info);
+      }
+
+      if (create) {
+        // Return ref to caller:
+        rld.incRef();
       }
+
       return rld;
     }
   }
@@ -795,7 +536,7 @@ public class IndexWriter implements Clos
 
     final ReadersAndLiveDocs rld = readerPool.get(info, false);
     if (rld != null) {
-      delCount += rld.pendingDeleteCount;
+      delCount += rld.getPendingDeleteCount();
     }
     return delCount;
   }
@@ -1116,7 +857,6 @@ public class IndexWriter implements Clos
         finishMerges(waitForMerges);
         stopMerges = true;
       }
-
       mergeScheduler.close();
 
       if (infoStream.isEnabled("IW")) {
@@ -1160,8 +900,6 @@ public class IndexWriter implements Clos
       }
     }
   }
-  
- 
 
   /** Returns the Directory used by this index. */
   public Directory getDirectory() {
@@ -2020,6 +1758,9 @@ public class IndexWriter implements Clos
           notifyAll();
         }
 
+        // Don't bother saving any changes in our segmentInfos
+        readerPool.dropAll(false);
+
         // Keep the same segmentInfos instance but replace all
         // of its SegmentInfo instances.  This is so the next
         // attempt to commit using this instance of IndexWriter
@@ -2038,9 +1779,6 @@ public class IndexWriter implements Clos
         // them:
         deleter.checkpoint(segmentInfos, false);
         deleter.refresh();
-
-        // Don't bother saving any changes in our segmentInfos
-        readerPool.dropAll(false);
       }
 
       lastCommitChangeCount = changeCount;
@@ -3023,16 +2761,18 @@ public class IndexWriter implements Clos
       final int docCount = info.docCount;
       final Bits prevLiveDocs = merge.readerLiveDocs.get(i);
       final Bits currentLiveDocs;
-      ReadersAndLiveDocs rld = readerPool.get(info, false);
-      // We enrolled in mergeInit:
-      assert rld != null;
-      currentLiveDocs = rld.liveDocs;
+      final ReadersAndLiveDocs rld = readerPool.get(info, false);
+      // We hold a ref so it should still be in the pool:
+      assert rld != null: "seg=" + info.name;
+      currentLiveDocs = rld.getLiveDocs();
 
       if (prevLiveDocs != null) {
 
         // If we had deletions on starting the merge we must
         // still have deletions now:
         assert currentLiveDocs != null;
+        assert prevLiveDocs.length() == docCount;
+        assert currentLiveDocs.length() == docCount;
 
         // There were deletes on this segment when the merge
         // started.  The merge has collapsed away those
@@ -3066,9 +2806,10 @@ public class IndexWriter implements Clos
             }
           }
         } else {
-          docUpto += info.docCount - info.getDelCount() - rld.pendingDeleteCount;
+          docUpto += info.docCount - info.getDelCount() - rld.getPendingDeleteCount();
         }
       } else if (currentLiveDocs != null) {
+        assert currentLiveDocs.length() == docCount;
         // This segment had no deletes before but now it
         // does:
         for(int j=0; j<docCount; j++) {
@@ -3087,11 +2828,13 @@ public class IndexWriter implements Clos
       }
     }
 
+    assert docUpto == merge.info.docCount;
+
     if (infoStream.isEnabled("IW")) {
       if (mergedDeletes == null) {
         infoStream.message("IW", "no new deletes since merge started");
       } else {
-        infoStream.message("IW", mergedDeletes.pendingDeleteCount + " new deletes since merge started");
+        infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started");
       }
     }
 
@@ -3136,7 +2879,7 @@ public class IndexWriter implements Clos
 
     final ReadersAndLiveDocs mergedDeletes =  merge.info.docCount == 0 ? null : commitMergedDeletes(merge);
 
-    assert mergedDeletes == null || mergedDeletes.pendingDeleteCount != 0;
+    assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0;
 
     // If the doc store we are using has been closed and
     // is in now compound format (but wasn't when we
@@ -3148,7 +2891,7 @@ public class IndexWriter implements Clos
     final boolean allDeleted = merge.segments.size() == 0 ||
       merge.info.docCount == 0 ||
       (mergedDeletes != null &&
-       mergedDeletes.pendingDeleteCount == merge.info.docCount);
+       mergedDeletes.getPendingDeleteCount() == merge.info.docCount);
 
     if (infoStream.isEnabled("IW")) {
       if (allDeleted) {
@@ -3165,15 +2908,14 @@ public class IndexWriter implements Clos
     assert merge.info.docCount != 0 || keepFullyDeletedSegments || dropSegment;
 
     segmentInfos.applyMergeChanges(merge, dropSegment);
-    
-    if (dropSegment) {
-      readerPool.drop(merge.info);
-      deleter.deleteNewFiles(merge.info.files());
-      assert !segmentInfos.contains(merge.info);
-    } else {
-      if (mergedDeletes != null && !poolReaders) {
-        mergedDeletes.writeLiveDocs(directory);
-        readerPool.drop(merge.info);
+
+    if (mergedDeletes != null) {
+      if (dropSegment) {
+        mergedDeletes.dropChanges();
+      }
+      readerPool.release(mergedDeletes);
+      if (dropSegment) {
+        readerPool.drop(mergedDeletes.info);
       }
     }
 
@@ -3289,7 +3031,6 @@ public class IndexWriter implements Clos
         infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.docCount + " docs");
       }
     }
-    //System.out.println(Thread.currentThread().getName() + ": merge end");
   }
 
   /** Hook that's called when the specified merge is complete. */
@@ -3524,9 +3265,20 @@ public class IndexWriter implements Clos
     boolean drop = !suppressExceptions;
     
     for (int i = 0; i < numSegments; i++) {
-      if (merge.readers.get(i) != null) {
+      final SegmentReader sr = merge.readers.get(i);
+      if (sr != null) {
         try {
-          readerPool.release(merge.readers.get(i), drop);
+          final ReadersAndLiveDocs rld = readerPool.get(sr.getSegmentInfo(), false);
+          // We still hold a ref so it should not have been removed:
+          assert rld != null;
+          if (drop) {
+            rld.dropChanges();
+          }
+          rld.release(sr);
+          readerPool.release(rld);
+          if (drop) {
+            readerPool.drop(rld.info);
+          }
         } catch (Throwable t) {
           if (th == null) {
             th = t;
@@ -3589,17 +3341,20 @@ public class IndexWriter implements Clos
 
         // Carefully pull the most recent live docs:
         final Bits liveDocs;
+        final int delCount;
+
         synchronized(this) {
           // Must sync to ensure BufferedDeletesStream
           // cannot change liveDocs/pendingDeleteCount while
           // we pull a copy:
           liveDocs = rld.getReadOnlyLiveDocs();
+          delCount = rld.getPendingDeleteCount() + info.getDelCount();
 
           assert rld.verifyDocCounts();
 
           if (infoStream.isEnabled("IW")) {
-            if (rld.pendingDeleteCount != 0) {
-              infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.pendingDeleteCount);
+            if (rld.getPendingDeleteCount() != 0) {
+              infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.getPendingDeleteCount());
             } else if (info.getDelCount() != 0) {
               infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount());
             } else {
@@ -3609,8 +3364,7 @@ public class IndexWriter implements Clos
         }
         merge.readerLiveDocs.add(liveDocs);
         merge.readers.add(reader);
-        final int delCount = rld.pendingDeleteCount + info.getDelCount();
-        assert delCount <= info.docCount;
+        assert delCount <= info.docCount: "delCount=" + delCount + " info.docCount=" + info.docCount + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
         if (delCount < info.docCount) {
           merger.add(reader, liveDocs);
         }
@@ -3708,7 +3462,8 @@ public class IndexWriter implements Clos
           mergedSegmentWarmer.warm(sr);
         } finally {
           synchronized(this) {
-            readerPool.release(sr, false);
+            rld.release(sr);
+            readerPool.release(rld);
           }
         }
       }
@@ -3762,11 +3517,11 @@ public class IndexWriter implements Clos
   /** @lucene.internal */
   public synchronized String segString(Iterable<SegmentInfo> infos) throws IOException {
     final StringBuilder buffer = new StringBuilder();
-    for(final SegmentInfo s : infos) {
+    for(final SegmentInfo info : infos) {
       if (buffer.length() > 0) {
         buffer.append(' ');
       }
-      buffer.append(segString(s));
+      buffer.append(segString(info));
     }
     return buffer.toString();
   }
@@ -3819,6 +3574,24 @@ public class IndexWriter implements Clos
     return true;
   }
 
+  // For infoStream output
+  synchronized SegmentInfos toLiveInfos(SegmentInfos sis) {
+    final SegmentInfos newSIS = new SegmentInfos();
+    final Map<SegmentInfo,SegmentInfo> liveSIS = new HashMap<SegmentInfo,SegmentInfo>();        
+    for(SegmentInfo info : segmentInfos) {
+      liveSIS.put(info, info);
+    }
+    for(SegmentInfo info : sis) {
+      SegmentInfo liveInfo = liveSIS.get(info);
+      if (liveInfo != null) {
+        info = liveInfo;
+      }
+      newSIS.add(info);
+    }
+
+    return newSIS;
+  }
+
   /** Walk through all files referenced by the current
    *  segmentInfos and ask the Directory to sync each file,
    *  if it wasn't already.  If that succeeds, then we
@@ -3853,7 +3626,7 @@ public class IndexWriter implements Clos
         }
 
         if (infoStream.isEnabled("IW")) {
-          infoStream.message("IW", "startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
+          infoStream.message("IW", "startCommit index=" + segString(toLiveInfos(toSync)) + " changeCount=" + changeCount);
         }
 
         assert filesExist(toSync);

Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java?rev=1299648&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java Mon Mar 12 12:23:14 2012
@@ -0,0 +1,303 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.codecs.LiveDocsFormat;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.MutableBits;
+
+// Used by IndexWriter to hold open SegmentReaders (for
+// searching or merging), plus pending deletes,
+// for a given segment
+class ReadersAndLiveDocs {
+  // Not final because we replace (clone) when we need to
+  // change it and it's been shared:
+  public final SegmentInfo info;
+
+  // Tracks how many consumers are using this instance:
+  private final AtomicInteger refCount = new AtomicInteger(1);
+
+  private final IndexWriter writer;
+
+  // Set once (null, and then maybe set, and never set again):
+  private SegmentReader reader;
+
+  // TODO: it's sometimes wasteful that we hold open two
+  // separate SRs (one for merging one for
+  // reading)... maybe just use a single SR?  The gains of
+  // not loading the terms index (for merging in the
+  // non-NRT case) are far less now... and if the app has
+  // any deletes it'll open real readers anyway.
+
+  // Set once (null, and then maybe set, and never set again):
+  private SegmentReader mergeReader;
+
+  // Holds the current shared (readable and writable
+  // liveDocs).  This is null when there are no deleted
+  // docs, and it's copy-on-write (cloned whenever we need
+  // to change it but it's been shared to an external NRT
+  // reader).
+  private Bits liveDocs;
+
+  // How many further deletions we've done against
+  // liveDocs vs when we loaded it or last wrote it:
+  private int pendingDeleteCount;
+
+  // True if the current liveDocs is referenced by an
+  // external NRT reader:
+  private boolean shared;
+
+  public ReadersAndLiveDocs(IndexWriter writer, SegmentInfo info) {
+    this.info = info;
+    this.writer = writer;
+    shared = true;
+  }
+
+  public void incRef() {
+    final int rc = refCount.incrementAndGet();
+    assert rc > 1;
+  }
+
+  public void decRef() {
+    final int rc = refCount.decrementAndGet();
+    assert rc >= 0;
+  }
+
+  public int refCount() {
+    final int rc = refCount.get();
+    assert rc >= 0;
+    return rc;
+  }
+
+  public synchronized int getPendingDeleteCount() {
+    return pendingDeleteCount;
+  }
+
+  // Call only from assert!
+  public synchronized boolean verifyDocCounts() {
+    int count;
+    if (liveDocs != null) {
+      count = 0;
+      for(int docID=0;docID<info.docCount;docID++) {
+        if (liveDocs.get(docID)) {
+          count++;
+        }
+      }
+    } else {
+      count = info.docCount;
+    }
+
+    assert info.docCount - info.getDelCount() - pendingDeleteCount == count: "info.docCount=" + info.docCount + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;;
+    return true;
+  }
+
+  // Get reader for searching/deleting
+  public synchronized SegmentReader getReader(IOContext context) throws IOException {
+    //System.out.println("  livedocs=" + rld.liveDocs);
+
+    if (reader == null) {
+      // We steal returned ref:
+      reader = new SegmentReader(info, writer.getConfig().getReaderTermsIndexDivisor(), context);
+      if (liveDocs == null) {
+        liveDocs = reader.getLiveDocs();
+      }
+      //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool");
+      //System.out.println(Thread.currentThread().getName() + ": getReader seg=" + info.name);
+    }
+
+    // Ref for caller
+    reader.incRef();
+    return reader;
+  }
+
+  // Get reader for merging (does not load the terms
+  // index):
+  public synchronized SegmentReader getMergeReader(IOContext context) throws IOException {
+    //System.out.println("  livedocs=" + rld.liveDocs);
+
+    if (mergeReader == null) {
+
+      if (reader != null) {
+        // Just use the already opened non-merge reader
+        // for merging.  In the NRT case this saves us
+        // pointless double-open:
+        //System.out.println("PROMOTE non-merge reader seg=" + rld.info);
+        // Ref for us:
+        reader.incRef();
+        mergeReader = reader;
+        //System.out.println(Thread.currentThread().getName() + ": getMergeReader share seg=" + info.name);
+      } else {
+        //System.out.println(Thread.currentThread().getName() + ": getMergeReader seg=" + info.name);
+        // We steal returned ref:
+        mergeReader = new SegmentReader(info, -1, context);
+        if (liveDocs == null) {
+          liveDocs = mergeReader.getLiveDocs();
+        }
+      }
+    }
+
+    // Ref for caller
+    mergeReader.incRef();
+    return mergeReader;
+  }
+
+  public synchronized void release(SegmentReader sr) throws IOException {
+    assert info == sr.getSegmentInfo();
+    sr.decRef();
+  }
+
+  public synchronized boolean delete(int docID) {
+    assert liveDocs != null;
+    assert Thread.holdsLock(writer);
+    assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.name + " docCount=" + info.docCount;
+    assert !shared;
+    final boolean didDelete = liveDocs.get(docID);
+    if (didDelete) {
+      ((MutableBits) liveDocs).clear(docID);
+      pendingDeleteCount++;
+      //System.out.println("  new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.docCount-liveDocs.count()));
+    }
+    return didDelete;
+  }
+
+  // NOTE: removes callers ref
+  public synchronized void dropReaders() throws IOException {
+    if (reader != null) {
+      //System.out.println("  pool.drop info=" + info + " rc=" + reader.getRefCount());
+      reader.decRef();
+      reader = null;
+    }
+    if (mergeReader != null) {
+      //System.out.println("  pool.drop info=" + info + " merge rc=" + mergeReader.getRefCount());
+      mergeReader.decRef();
+      mergeReader = null;
+    }
+    decRef();
+  }
+
+  /**
+   * Returns a ref to a clone.  NOTE: this clone is not
+   * enrolled in the pool, so you should simply close()
+   * it when you're done (ie, do not call release()).
+   */
+  public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
+    if (reader == null) {
+      getReader(context).decRef();
+      assert reader != null;
+    }
+    shared = true;
+    if (liveDocs != null) {
+      return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.docCount - info.getDelCount() - pendingDeleteCount);
+    } else {
+      assert reader.getLiveDocs() == liveDocs;
+      reader.incRef();
+      return reader;
+    }
+  }
+
+  public synchronized void initWritableLiveDocs() throws IOException {
+    assert Thread.holdsLock(writer);
+    assert info.docCount > 0;
+    //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
+    if (shared) {
+      // Copy on write: this means we've cloned a
+      // SegmentReader sharing the current liveDocs
+      // instance; must now make a private clone so we can
+      // change it:
+      LiveDocsFormat liveDocsFormat = info.getCodec().liveDocsFormat();
+      if (liveDocs == null) {
+        //System.out.println("create BV seg=" + info);
+        liveDocs = liveDocsFormat.newLiveDocs(info.docCount);
+      } else {
+        liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
+      }
+      shared = false;
+    } else {
+      assert liveDocs != null;
+    }
+  }
+
+  public synchronized Bits getLiveDocs() {
+    assert Thread.holdsLock(writer);
+    return liveDocs;
+  }
+
+  public synchronized Bits getReadOnlyLiveDocs() {
+    //System.out.println("getROLiveDocs seg=" + info);
+    assert Thread.holdsLock(writer);
+    shared = true;
+    //if (liveDocs != null) {
+    //System.out.println("  liveCount=" + liveDocs.count());
+    //}
+    return liveDocs;
+  }
+
+  public synchronized void dropChanges() {
+    // Discard (don't save) changes when we are dropping
+    // the reader; this is used only on the sub-readers
+    // after a successful merge.  If deletes had
+    // accumulated on those sub-readers while the merge
+    // is running, by now we have carried forward those
+    // deletes onto the newly merged segment, so we can
+    // discard them on the sub-readers:
+    pendingDeleteCount = 0;
+  }
+
+  // Commit live docs to the directory (writes new
+  // _X_N.del files); returns true if it wrote the file
+  // and false if there were no new deletes to write:
+  public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
+    //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount);
+    if (pendingDeleteCount != 0) {
+      // We have new deletes
+      assert liveDocs.length() == info.docCount;
+
+      // Save in case we need to rollback on failure:
+      final SegmentInfo sav = (SegmentInfo) info.clone();
+      info.advanceDelGen();
+      info.setDelCount(info.getDelCount() + pendingDeleteCount);
+
+      // We can write directly to the actual name (vs to a
+      // .tmp & renaming it) because the file is not live
+      // until segments file is written:
+      boolean success = false;
+      try {
+        info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, dir, info, IOContext.DEFAULT);
+        success = true;
+      } finally {
+        if (!success) {
+          info.reset(sav);
+        }
+      }
+      pendingDeleteCount = 0;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ReadersAndLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")";
+  }
+}

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java Mon Mar 12 12:23:14 2012
@@ -91,21 +91,27 @@ final class StandardDirectoryReader exte
       try {
         final SegmentInfo info = infos.info(i);
         assert info.dir == dir;
-        final IndexWriter.ReadersAndLiveDocs rld = writer.readerPool.get(info, true);
-        final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
-        if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
-          readers.add(reader);
-          infosUpto++;
-        } else {
-          reader.close();
-          segmentInfos.remove(infosUpto);
+        final ReadersAndLiveDocs rld = writer.readerPool.get(info, true);
+        try {
+          final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
+          if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
+            // Steal the ref:
+            readers.add(reader);
+            infosUpto++;
+          } else {
+            reader.close();
+            segmentInfos.remove(infosUpto);
+          }
+        } finally {
+          writer.readerPool.release(rld);
         }
         success = true;
       } catch(IOException ex) {
         prior = ex;
       } finally {
-        if (!success)
+        if (!success) {
           IOUtils.closeWhileHandlingException(prior, readers);
+        }
       }
     }
     return new StandardDirectoryReader(dir, readers.toArray(new SegmentReader[readers.size()]),
@@ -219,12 +225,12 @@ final class StandardDirectoryReader exte
   }
 
   @Override
-  protected final DirectoryReader doOpenIfChanged() throws CorruptIndexException, IOException {
+  protected DirectoryReader doOpenIfChanged() throws CorruptIndexException, IOException {
     return doOpenIfChanged(null);
   }
 
   @Override
-  protected final DirectoryReader doOpenIfChanged(final IndexCommit commit) throws CorruptIndexException, IOException {
+  protected DirectoryReader doOpenIfChanged(final IndexCommit commit) throws CorruptIndexException, IOException {
     ensureOpen();
 
     // If we were obtained by writer.getReader(), re-ask the
@@ -237,7 +243,7 @@ final class StandardDirectoryReader exte
   }
 
   @Override
-  protected final DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException {
+  protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException {
     ensureOpen();
     if (writer == this.writer && applyAllDeletes == this.applyAllDeletes) {
       return doOpenFromWriter(null);
@@ -246,7 +252,7 @@ final class StandardDirectoryReader exte
     }
   }
 
-  private final DirectoryReader doOpenFromWriter(IndexCommit commit) throws CorruptIndexException, IOException {
+  private DirectoryReader doOpenFromWriter(IndexCommit commit) throws CorruptIndexException, IOException {
     if (commit != null) {
       throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() cannot currently accept a commit");
     }