You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/03/12 13:23:15 UTC
svn commit: r1299648 - in
/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index:
BufferedDeletesStream.java IndexFileDeleter.java IndexWriter.java
ReadersAndLiveDocs.java StandardDirectoryReader.java
Author: mikemccand
Date: Mon Mar 12 12:23:14 2012
New Revision: 1299648
URL: http://svn.apache.org/viewvc?rev=1299648&view=rev
Log:
LUCENE-3855: fix thread safety issues in IW's livedocs/reader pool
Added:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (with props)
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedDeletesStream.java Mon Mar 12 12:23:14 2012
@@ -210,7 +210,7 @@ class BufferedDeletesStream {
// Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info);
- final IndexWriter.ReadersAndLiveDocs rld = readerPool.get(info, true);
+ final ReadersAndLiveDocs rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
final boolean segAllDeletes;
@@ -224,11 +224,12 @@ class BufferedDeletesStream {
// Don't delete by Term here; DocumentsWriterPerThread
// already did that on flush:
delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader);
- final int fullDelCount = rld.info.getDelCount() + rld.pendingDeleteCount;
+ final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
assert fullDelCount <= rld.info.docCount;
segAllDeletes = fullDelCount == rld.info.docCount;
} finally {
- readerPool.release(reader, false);
+ rld.release(reader);
+ readerPool.release(rld);
}
anyNewDeletes |= delCount > 0;
@@ -262,18 +263,19 @@ class BufferedDeletesStream {
if (coalescedDeletes != null) {
// Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info);
- final IndexWriter.ReadersAndLiveDocs rld = readerPool.get(info, true);
+ final ReadersAndLiveDocs rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
final boolean segAllDeletes;
try {
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
- final int fullDelCount = rld.info.getDelCount() + rld.pendingDeleteCount;
+ final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
assert fullDelCount <= rld.info.docCount;
segAllDeletes = fullDelCount == rld.info.docCount;
- } finally {
- readerPool.release(reader, false);
+ } finally {
+ rld.release(reader);
+ readerPool.release(rld);
}
anyNewDeletes |= delCount > 0;
@@ -353,7 +355,7 @@ class BufferedDeletesStream {
}
// Delete by Term
- private synchronized long applyTermDeletes(Iterable<Term> termsIter, IndexWriter.ReadersAndLiveDocs rld, SegmentReader reader) throws IOException {
+ private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndLiveDocs rld, SegmentReader reader) throws IOException {
long delCount = 0;
Fields fields = reader.fields();
if (fields == null) {
@@ -394,7 +396,7 @@ class BufferedDeletesStream {
// System.out.println(" term=" + term);
if (termsEnum.seekExact(term.bytes(), false)) {
- DocsEnum docsEnum = termsEnum.docs(rld.liveDocs, docs, false);
+ DocsEnum docsEnum = termsEnum.docs(rld.getLiveDocs(), docs, false);
//System.out.println("BDS: got docsEnum=" + docsEnum);
if (docsEnum != null) {
@@ -434,7 +436,7 @@ class BufferedDeletesStream {
}
// Delete by query
- private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, IndexWriter.ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException {
+ private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, ReadersAndLiveDocs rld, final SegmentReader reader) throws IOException {
long delCount = 0;
final AtomicReaderContext readerContext = reader.getTopReaderContext();
boolean any = false;
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java Mon Mar 12 12:23:14 2012
@@ -453,7 +453,7 @@ final class IndexFileDeleter {
assert Thread.holdsLock(writer);
if (infoStream.isEnabled("IFD")) {
- infoStream.message("IFD", "now checkpoint \"" + writer.segString(segmentInfos) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ infoStream.message("IFD", "now checkpoint \"" + writer.segString(writer.toLiveInfos(segmentInfos)) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Mon Mar 12 12:23:14 2012
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
-import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
@@ -392,260 +391,6 @@ public class IndexWriter implements Clos
return r;
}
- // This class inherits all sync from IW:
- class ReadersAndLiveDocs {
- // Not final because we replace (clone) when we need to
- // change it and it's been shared:
- public final SegmentInfo info;
-
- // Set once (null, and then maybe set, and never set again):
- private SegmentReader reader;
-
- // TODO: it's sometimes wasteful that we hold open two
- // separate SRs (one for merging one for
- // reading)... maybe just use a single SR? The gains of
- // not loading the terms index (for merging in the
- // non-NRT case) are far less now... and if the app has
- // any deletes it'll open real readers anyway.
-
- // Set once (null, and then maybe set, and never set again):
- private SegmentReader mergeReader;
-
- // Holds the current shared (readable and writable
- // liveDocs). This is null when there are no deleted
- // docs, and it's copy-on-write (cloned whenever we need
- // to change it but it's been shared to an external NRT
- // reader).
- public Bits liveDocs;
-
- // How many further deletions we've done against
- // liveDocs vs when we loaded it or last wrote it:
- public int pendingDeleteCount;
-
- // True if the current liveDocs is referenced by an
- // external NRT reader:
- public boolean shared;
-
- public ReadersAndLiveDocs(SegmentInfo info) {
- this.info = info;
- shared = true;
- }
-
- // Returns false if we are the only remaining refs of
- // this reader:
- public synchronized boolean anyOutsideRefs(SegmentReader sr) {
- int myRefCounts = 0;
- if (sr == reader) {
- myRefCounts++;
- }
- if (sr == mergeReader) {
- myRefCounts++;
- }
- final int rc = sr.getRefCount();
- assert rc >= myRefCounts;
- return rc > myRefCounts;
- }
-
- // Call only from assert!
- public synchronized boolean verifyDocCounts() {
- int count;
- if (liveDocs != null) {
- count = 0;
- for(int docID=0;docID<info.docCount;docID++) {
- if (liveDocs.get(docID)) {
- count++;
- }
- }
- } else {
- count = info.docCount;
- }
-
- assert info.docCount - info.getDelCount() - pendingDeleteCount == count: "info.docCount=" + info.docCount + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;;
- return true;
- }
-
- // Returns true if any reader remains
- public synchronized boolean removeReader(SegmentReader sr, boolean drop) throws IOException {
- if (sr == reader) {
- //System.out.println(" non-merge reader");
- reader.decRef();
- reader = null;
- }
-
- if (sr == mergeReader) {
- //System.out.println(" merge reader");
- mergeReader.decRef();
- mergeReader = null;
- if (drop && reader != null) {
- //System.out.println(" also release normal reader rc=" + rld.reader.getRefCount());
- reader.decRef();
- reader = null;
- }
- }
-
- return reader != null || mergeReader != null;
- }
-
- // Get reader for searching/deleting
- public synchronized SegmentReader getReader(IOContext context) throws IOException {
- //System.out.println(" livedocs=" + rld.liveDocs);
-
- if (reader == null) {
- reader = new SegmentReader(info, config.getReaderTermsIndexDivisor(), context);
- if (liveDocs == null) {
- liveDocs = reader.getLiveDocs();
- }
- //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool");
- }
-
- // Ref for caller
- reader.incRef();
- return reader;
- }
-
- // Get reader for merging (does not load the terms
- // index):
- public synchronized SegmentReader getMergeReader(IOContext context) throws IOException {
- //System.out.println(" livedocs=" + rld.liveDocs);
-
- if (mergeReader == null) {
-
- if (reader != null) {
- // Just use the already opened non-merge reader
- // for merging. In the NRT case this saves us
- // pointless double-open:
- //System.out.println("PROMOTE non-merge reader seg=" + rld.info);
- reader.incRef();
- mergeReader = reader;
- } else {
- mergeReader = new SegmentReader(info, -1, context);
- if (liveDocs == null) {
- liveDocs = mergeReader.getLiveDocs();
- }
- }
- }
-
- // Ref for caller
- mergeReader.incRef();
- return mergeReader;
- }
-
- public synchronized boolean delete(int docID) {
- assert liveDocs != null;
- assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + ",liveDocsLength=" + liveDocs.length();
- assert !shared;
- final boolean didDelete = liveDocs.get(docID);
- if (didDelete) {
- ((MutableBits) liveDocs).clear(docID);
- pendingDeleteCount++;
- //System.out.println(" new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.docCount-liveDocs.count()));
- }
- return didDelete;
- }
-
- public synchronized void dropReaders() throws IOException {
- if (reader != null) {
- //System.out.println(" pool.drop info=" + info + " rc=" + reader.getRefCount());
- reader.decRef();
- reader = null;
- }
- if (mergeReader != null) {
- //System.out.println(" pool.drop info=" + info + " merge rc=" + mergeReader.getRefCount());
- mergeReader.decRef();
- mergeReader = null;
- }
- }
-
- /**
- * Returns a ref to a clone. NOTE: this clone is not
- * enrolled in the pool, so you should simply close()
- * it when you're done (ie, do not call release()).
- */
- public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
- if (reader == null) {
- getReader(context).decRef();
- assert reader != null;
- }
- shared = true;
- if (liveDocs != null) {
- return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.docCount - info.getDelCount() - pendingDeleteCount);
- } else {
- reader.incRef();
- return reader;
- }
- }
-
- public synchronized void initWritableLiveDocs() throws IOException {
- assert Thread.holdsLock(IndexWriter.this);
- assert info.docCount > 0;
- //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
- if (shared) {
- // Copy on write: this means we've cloned a
- // SegmentReader sharing the current liveDocs
- // instance; must now make a private clone so we can
- // change it:
- LiveDocsFormat liveDocsFormat = info.getCodec().liveDocsFormat();
- if (liveDocs == null) {
- //System.out.println("create BV seg=" + info);
- liveDocs = liveDocsFormat.newLiveDocs(info.docCount);
- } else {
- liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
- }
- shared = false;
- } else {
- assert liveDocs != null;
- }
- }
-
- public synchronized Bits getReadOnlyLiveDocs() {
- //System.out.println("getROLiveDocs seg=" + info);
- assert Thread.holdsLock(IndexWriter.this);
- shared = true;
- //if (liveDocs != null) {
- //System.out.println(" liveCount=" + liveDocs.count());
- //}
- return liveDocs;
- }
-
- // Commit live docs to the directory (writes new
- // _X_N.del files); returns true if it wrote the file
- // and false if there were no new deletes to write:
- public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
- //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount);
- if (pendingDeleteCount != 0) {
- // We have new deletes
- assert liveDocs.length() == info.docCount;
-
- // Save in case we need to rollback on failure:
- final SegmentInfo sav = (SegmentInfo) info.clone();
- info.advanceDelGen();
- info.setDelCount(info.getDelCount() + pendingDeleteCount);
-
- // We can write directly to the actual name (vs to a
- // .tmp & renaming it) because the file is not live
- // until segments file is written:
- boolean success = false;
- try {
- info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, dir, info, IOContext.DEFAULT);
- success = true;
- } finally {
- if (!success) {
- info.reset(sav);
- }
- }
- pendingDeleteCount = 0;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "SegmentLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")";
- }
- }
-
/** Holds shared SegmentReader instances. IndexWriter uses
* SegmentReaders for 1) applying deletes, 2) doing
* merges, 3) handing out a real-time reader. This pool
@@ -665,44 +410,36 @@ public class IndexWriter implements Clos
return true;
}
- /**
- * Release the segment reader (i.e. decRef it and close if there
- * are no more references). If drop is true then we
- * remove this entry from the pool.
- * @param sr
- * @throws IOException
- */
- public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
- // Drop caller's ref; for an external reader (not
- // pooled), this decRef will close it
- //System.out.println("pool.release seg=" + sr.getSegmentInfo() + " rc=" + sr.getRefCount() + " drop=" + drop);
- sr.decRef();
-
- final ReadersAndLiveDocs rld = readerMap.get(sr.getSegmentInfo());
-
- if (rld != null && (drop || (!poolReaders && !rld.anyOutsideRefs(sr)))) {
-
- // Discard (don't save) changes when we are dropping
- // the reader; this is used only on the sub-readers
- // after a successful merge. If deletes had
- // accumulated on those sub-readers while the merge
- // is running, by now we have carried forward those
- // deletes onto the newly merged segment, so we can
- // discard them on the sub-readers:
+ public synchronized void drop(SegmentInfo info) throws IOException {
+ final ReadersAndLiveDocs rld = readerMap.get(info);
+ if (rld != null) {
+ assert info == rld.info;
+ readerMap.remove(info);
+ rld.dropReaders();
+ }
+ }
- if (!drop) {
- if (rld.writeLiveDocs(directory)) {
- assert infoIsLive(sr.getSegmentInfo());
- // Must checkpoint w/ deleter, because we just
- // created created new _X_N.del file.
- deleter.checkpoint(segmentInfos, false);
- }
- }
+ public synchronized void release(ReadersAndLiveDocs rld) throws IOException {
- if (!rld.removeReader(sr, drop)) {
- //System.out.println("DROP seg=" + rld.info + " " + readerMap.size() + " in pool");
- readerMap.remove(sr.getSegmentInfo());
+ // Matches incRef in get:
+ rld.decRef();
+
+ // Pool still holds a ref:
+ assert rld.refCount() >= 1;
+
+ if (!poolReaders && rld.refCount() == 1) {
+ // This is the last ref to this RLD, and we're not
+ // pooling, so remove it:
+ if (rld.writeLiveDocs(directory)) {
+ // Make sure we only write del docs for a live segment:
+ assert infoIsLive(rld.info);
+ // Must checkpoint w/ deleter, because we just
+ // created created new _X_N.del file.
+ deleter.checkpoint(segmentInfos, false);
}
+
+ rld.dropReaders();
+ readerMap.remove(rld.info);
}
}
@@ -712,8 +449,8 @@ public class IndexWriter implements Clos
final Iterator<Map.Entry<SegmentInfo,ReadersAndLiveDocs>> it = readerMap.entrySet().iterator();
while(it.hasNext()) {
final ReadersAndLiveDocs rld = it.next().getValue();
- //System.out.println("pool.dropAll: seg=" + rld.info);
if (doSave && rld.writeLiveDocs(directory)) {
+ // Make sure we only write del docs for a live segment:
assert infoIsLive(rld.info);
// Must checkpoint w/ deleter, because we just
// created created new _X_N.del file.
@@ -735,13 +472,6 @@ public class IndexWriter implements Clos
assert readerMap.size() == 0;
}
- public synchronized void drop(SegmentInfo info) throws IOException {
- final ReadersAndLiveDocs rld = readerMap.remove(info);
- if (rld != null) {
- rld.dropReaders();
- }
- }
-
/**
* Commit live docs changes for the segment readers for
* the provided infos.
@@ -751,19 +481,23 @@ public class IndexWriter implements Clos
public synchronized void commit(SegmentInfos infos) throws IOException {
for (SegmentInfo info : infos) {
final ReadersAndLiveDocs rld = readerMap.get(info);
- if (rld != null && rld.writeLiveDocs(directory)) {
- assert infoIsLive(info);
- // Must checkpoint w/ deleter, because we just
- // created created new _X_N.del file.
- deleter.checkpoint(segmentInfos, false);
+ if (rld != null) {
+ assert rld.info == info;
+ if (rld.writeLiveDocs(directory)) {
+ // Make sure we only write del docs for a live segment:
+ assert infoIsLive(info);
+ // Must checkpoint w/ deleter, because we just
+ // created created new _X_N.del file.
+ deleter.checkpoint(segmentInfos, false);
+ }
}
}
}
/**
* Obtain a ReadersAndLiveDocs instance from the
- * readerPool. If getReader is true, you must later call
- * {@link #release(SegmentReader)}.
+ * readerPool. If create is true, you must later call
+ * {@link #release(ReadersAndLiveDocs)}.
* @throws IOException
*/
public synchronized ReadersAndLiveDocs get(SegmentInfo info, boolean create) {
@@ -771,15 +505,22 @@ public class IndexWriter implements Clos
assert info.dir == directory;
ReadersAndLiveDocs rld = readerMap.get(info);
- //System.out.println("rld.get seg=" + info + " poolReaders=" + poolReaders);
if (rld == null) {
- //System.out.println(" new rld");
if (!create) {
return null;
}
- rld = new ReadersAndLiveDocs(info);
+ rld = new ReadersAndLiveDocs(IndexWriter.this, info);
+ // Steal initial reference:
readerMap.put(info, rld);
+ } else {
+ assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + infoIsLive(rld.info) + " vs " + infoIsLive(info);
+ }
+
+ if (create) {
+ // Return ref to caller:
+ rld.incRef();
}
+
return rld;
}
}
@@ -795,7 +536,7 @@ public class IndexWriter implements Clos
final ReadersAndLiveDocs rld = readerPool.get(info, false);
if (rld != null) {
- delCount += rld.pendingDeleteCount;
+ delCount += rld.getPendingDeleteCount();
}
return delCount;
}
@@ -1116,7 +857,6 @@ public class IndexWriter implements Clos
finishMerges(waitForMerges);
stopMerges = true;
}
-
mergeScheduler.close();
if (infoStream.isEnabled("IW")) {
@@ -1160,8 +900,6 @@ public class IndexWriter implements Clos
}
}
}
-
-
/** Returns the Directory used by this index. */
public Directory getDirectory() {
@@ -2020,6 +1758,9 @@ public class IndexWriter implements Clos
notifyAll();
}
+ // Don't bother saving any changes in our segmentInfos
+ readerPool.dropAll(false);
+
// Keep the same segmentInfos instance but replace all
// of its SegmentInfo instances. This is so the next
// attempt to commit using this instance of IndexWriter
@@ -2038,9 +1779,6 @@ public class IndexWriter implements Clos
// them:
deleter.checkpoint(segmentInfos, false);
deleter.refresh();
-
- // Don't bother saving any changes in our segmentInfos
- readerPool.dropAll(false);
}
lastCommitChangeCount = changeCount;
@@ -3023,16 +2761,18 @@ public class IndexWriter implements Clos
final int docCount = info.docCount;
final Bits prevLiveDocs = merge.readerLiveDocs.get(i);
final Bits currentLiveDocs;
- ReadersAndLiveDocs rld = readerPool.get(info, false);
- // We enrolled in mergeInit:
- assert rld != null;
- currentLiveDocs = rld.liveDocs;
+ final ReadersAndLiveDocs rld = readerPool.get(info, false);
+ // We hold a ref so it should still be in the pool:
+ assert rld != null: "seg=" + info.name;
+ currentLiveDocs = rld.getLiveDocs();
if (prevLiveDocs != null) {
// If we had deletions on starting the merge we must
// still have deletions now:
assert currentLiveDocs != null;
+ assert prevLiveDocs.length() == docCount;
+ assert currentLiveDocs.length() == docCount;
// There were deletes on this segment when the merge
// started. The merge has collapsed away those
@@ -3066,9 +2806,10 @@ public class IndexWriter implements Clos
}
}
} else {
- docUpto += info.docCount - info.getDelCount() - rld.pendingDeleteCount;
+ docUpto += info.docCount - info.getDelCount() - rld.getPendingDeleteCount();
}
} else if (currentLiveDocs != null) {
+ assert currentLiveDocs.length() == docCount;
// This segment had no deletes before but now it
// does:
for(int j=0; j<docCount; j++) {
@@ -3087,11 +2828,13 @@ public class IndexWriter implements Clos
}
}
+ assert docUpto == merge.info.docCount;
+
if (infoStream.isEnabled("IW")) {
if (mergedDeletes == null) {
infoStream.message("IW", "no new deletes since merge started");
} else {
- infoStream.message("IW", mergedDeletes.pendingDeleteCount + " new deletes since merge started");
+ infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started");
}
}
@@ -3136,7 +2879,7 @@ public class IndexWriter implements Clos
final ReadersAndLiveDocs mergedDeletes = merge.info.docCount == 0 ? null : commitMergedDeletes(merge);
- assert mergedDeletes == null || mergedDeletes.pendingDeleteCount != 0;
+ assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0;
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
@@ -3148,7 +2891,7 @@ public class IndexWriter implements Clos
final boolean allDeleted = merge.segments.size() == 0 ||
merge.info.docCount == 0 ||
(mergedDeletes != null &&
- mergedDeletes.pendingDeleteCount == merge.info.docCount);
+ mergedDeletes.getPendingDeleteCount() == merge.info.docCount);
if (infoStream.isEnabled("IW")) {
if (allDeleted) {
@@ -3165,15 +2908,14 @@ public class IndexWriter implements Clos
assert merge.info.docCount != 0 || keepFullyDeletedSegments || dropSegment;
segmentInfos.applyMergeChanges(merge, dropSegment);
-
- if (dropSegment) {
- readerPool.drop(merge.info);
- deleter.deleteNewFiles(merge.info.files());
- assert !segmentInfos.contains(merge.info);
- } else {
- if (mergedDeletes != null && !poolReaders) {
- mergedDeletes.writeLiveDocs(directory);
- readerPool.drop(merge.info);
+
+ if (mergedDeletes != null) {
+ if (dropSegment) {
+ mergedDeletes.dropChanges();
+ }
+ readerPool.release(mergedDeletes);
+ if (dropSegment) {
+ readerPool.drop(mergedDeletes.info);
}
}
@@ -3289,7 +3031,6 @@ public class IndexWriter implements Clos
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.docCount + " docs");
}
}
- //System.out.println(Thread.currentThread().getName() + ": merge end");
}
/** Hook that's called when the specified merge is complete. */
@@ -3524,9 +3265,20 @@ public class IndexWriter implements Clos
boolean drop = !suppressExceptions;
for (int i = 0; i < numSegments; i++) {
- if (merge.readers.get(i) != null) {
+ final SegmentReader sr = merge.readers.get(i);
+ if (sr != null) {
try {
- readerPool.release(merge.readers.get(i), drop);
+ final ReadersAndLiveDocs rld = readerPool.get(sr.getSegmentInfo(), false);
+ // We still hold a ref so it should not have been removed:
+ assert rld != null;
+ if (drop) {
+ rld.dropChanges();
+ }
+ rld.release(sr);
+ readerPool.release(rld);
+ if (drop) {
+ readerPool.drop(rld.info);
+ }
} catch (Throwable t) {
if (th == null) {
th = t;
@@ -3589,17 +3341,20 @@ public class IndexWriter implements Clos
// Carefully pull the most recent live docs:
final Bits liveDocs;
+ final int delCount;
+
synchronized(this) {
// Must sync to ensure BufferedDeletesStream
// cannot change liveDocs/pendingDeleteCount while
// we pull a copy:
liveDocs = rld.getReadOnlyLiveDocs();
+ delCount = rld.getPendingDeleteCount() + info.getDelCount();
assert rld.verifyDocCounts();
if (infoStream.isEnabled("IW")) {
- if (rld.pendingDeleteCount != 0) {
- infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.pendingDeleteCount);
+ if (rld.getPendingDeleteCount() != 0) {
+ infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.getPendingDeleteCount());
} else if (info.getDelCount() != 0) {
infoStream.message("IW", "seg=" + info + " delCount=" + info.getDelCount());
} else {
@@ -3609,8 +3364,7 @@ public class IndexWriter implements Clos
}
merge.readerLiveDocs.add(liveDocs);
merge.readers.add(reader);
- final int delCount = rld.pendingDeleteCount + info.getDelCount();
- assert delCount <= info.docCount;
+ assert delCount <= info.docCount: "delCount=" + delCount + " info.docCount=" + info.docCount + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
if (delCount < info.docCount) {
merger.add(reader, liveDocs);
}
@@ -3708,7 +3462,8 @@ public class IndexWriter implements Clos
mergedSegmentWarmer.warm(sr);
} finally {
synchronized(this) {
- readerPool.release(sr, false);
+ rld.release(sr);
+ readerPool.release(rld);
}
}
}
@@ -3762,11 +3517,11 @@ public class IndexWriter implements Clos
/** @lucene.internal */
public synchronized String segString(Iterable<SegmentInfo> infos) throws IOException {
final StringBuilder buffer = new StringBuilder();
- for(final SegmentInfo s : infos) {
+ for(final SegmentInfo info : infos) {
if (buffer.length() > 0) {
buffer.append(' ');
}
- buffer.append(segString(s));
+ buffer.append(segString(info));
}
return buffer.toString();
}
@@ -3819,6 +3574,24 @@ public class IndexWriter implements Clos
return true;
}
+ // For infoStream output
+ synchronized SegmentInfos toLiveInfos(SegmentInfos sis) {
+ final SegmentInfos newSIS = new SegmentInfos();
+ final Map<SegmentInfo,SegmentInfo> liveSIS = new HashMap<SegmentInfo,SegmentInfo>();
+ for(SegmentInfo info : segmentInfos) {
+ liveSIS.put(info, info);
+ }
+ for(SegmentInfo info : sis) {
+ SegmentInfo liveInfo = liveSIS.get(info);
+ if (liveInfo != null) {
+ info = liveInfo;
+ }
+ newSIS.add(info);
+ }
+
+ return newSIS;
+ }
+
/** Walk through all files referenced by the current
* segmentInfos and ask the Directory to sync each file,
* if it wasn't already. If that succeeds, then we
@@ -3853,7 +3626,7 @@ public class IndexWriter implements Clos
}
if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
+ infoStream.message("IW", "startCommit index=" + segString(toLiveInfos(toSync)) + " changeCount=" + changeCount);
}
assert filesExist(toSync);
Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java?rev=1299648&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java Mon Mar 12 12:23:14 2012
@@ -0,0 +1,303 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.codecs.LiveDocsFormat;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.MutableBits;
+
+// Used by IndexWriter to hold open SegmentReaders (for
+// searching or merging), plus pending deletes,
+// for a given segment
+class ReadersAndLiveDocs {
+ // Not final because we replace (clone) when we need to
+ // change it and it's been shared:
+ public final SegmentInfo info;
+
+ // Tracks how many consumers are using this instance:
+ private final AtomicInteger refCount = new AtomicInteger(1);
+
+ private final IndexWriter writer;
+
+ // Set once (null, and then maybe set, and never set again):
+ private SegmentReader reader;
+
+ // TODO: it's sometimes wasteful that we hold open two
+ // separate SRs (one for merging one for
+ // reading)... maybe just use a single SR? The gains of
+ // not loading the terms index (for merging in the
+ // non-NRT case) are far less now... and if the app has
+ // any deletes it'll open real readers anyway.
+
+ // Set once (null, and then maybe set, and never set again):
+ private SegmentReader mergeReader;
+
+ // Holds the current shared (readable and writable
+ // liveDocs). This is null when there are no deleted
+ // docs, and it's copy-on-write (cloned whenever we need
+ // to change it but it's been shared to an external NRT
+ // reader).
+ private Bits liveDocs;
+
+ // How many further deletions we've done against
+ // liveDocs vs when we loaded it or last wrote it:
+ private int pendingDeleteCount;
+
+ // True if the current liveDocs is referenced by an
+ // external NRT reader:
+ private boolean shared;
+
+ public ReadersAndLiveDocs(IndexWriter writer, SegmentInfo info) {
+ this.info = info;
+ this.writer = writer;
+ shared = true;
+ }
+
+ public void incRef() {
+ final int rc = refCount.incrementAndGet();
+ assert rc > 1;
+ }
+
+ public void decRef() {
+ final int rc = refCount.decrementAndGet();
+ assert rc >= 0;
+ }
+
+ public int refCount() {
+ final int rc = refCount.get();
+ assert rc >= 0;
+ return rc;
+ }
+
+ public synchronized int getPendingDeleteCount() {
+ return pendingDeleteCount;
+ }
+
+ // Call only from assert!
+ public synchronized boolean verifyDocCounts() {
+ int count;
+ if (liveDocs != null) {
+ count = 0;
+ for(int docID=0;docID<info.docCount;docID++) {
+ if (liveDocs.get(docID)) {
+ count++;
+ }
+ }
+ } else {
+ count = info.docCount;
+ }
+
+ assert info.docCount - info.getDelCount() - pendingDeleteCount == count: "info.docCount=" + info.docCount + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;;
+ return true;
+ }
+
+ // Get reader for searching/deleting
+ public synchronized SegmentReader getReader(IOContext context) throws IOException {
+ //System.out.println(" livedocs=" + rld.liveDocs);
+
+ if (reader == null) {
+ // We steal returned ref:
+ reader = new SegmentReader(info, writer.getConfig().getReaderTermsIndexDivisor(), context);
+ if (liveDocs == null) {
+ liveDocs = reader.getLiveDocs();
+ }
+ //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool");
+ //System.out.println(Thread.currentThread().getName() + ": getReader seg=" + info.name);
+ }
+
+ // Ref for caller
+ reader.incRef();
+ return reader;
+ }
+
+ // Get reader for merging (does not load the terms
+ // index):
+ public synchronized SegmentReader getMergeReader(IOContext context) throws IOException {
+ //System.out.println(" livedocs=" + rld.liveDocs);
+
+ if (mergeReader == null) {
+
+ if (reader != null) {
+ // Just use the already opened non-merge reader
+ // for merging. In the NRT case this saves us
+ // pointless double-open:
+ //System.out.println("PROMOTE non-merge reader seg=" + rld.info);
+ // Ref for us:
+ reader.incRef();
+ mergeReader = reader;
+ //System.out.println(Thread.currentThread().getName() + ": getMergeReader share seg=" + info.name);
+ } else {
+ //System.out.println(Thread.currentThread().getName() + ": getMergeReader seg=" + info.name);
+ // We steal returned ref:
+ mergeReader = new SegmentReader(info, -1, context);
+ if (liveDocs == null) {
+ liveDocs = mergeReader.getLiveDocs();
+ }
+ }
+ }
+
+ // Ref for caller
+ mergeReader.incRef();
+ return mergeReader;
+ }
+
+ public synchronized void release(SegmentReader sr) throws IOException {
+ assert info == sr.getSegmentInfo();
+ sr.decRef();
+ }
+
+ public synchronized boolean delete(int docID) {
+ assert liveDocs != null;
+ assert Thread.holdsLock(writer);
+ assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.name + " docCount=" + info.docCount;
+ assert !shared;
+ final boolean didDelete = liveDocs.get(docID);
+ if (didDelete) {
+ ((MutableBits) liveDocs).clear(docID);
+ pendingDeleteCount++;
+ //System.out.println(" new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.docCount-liveDocs.count()));
+ }
+ return didDelete;
+ }
+
+ // NOTE: removes callers ref
+ public synchronized void dropReaders() throws IOException {
+ if (reader != null) {
+ //System.out.println(" pool.drop info=" + info + " rc=" + reader.getRefCount());
+ reader.decRef();
+ reader = null;
+ }
+ if (mergeReader != null) {
+ //System.out.println(" pool.drop info=" + info + " merge rc=" + mergeReader.getRefCount());
+ mergeReader.decRef();
+ mergeReader = null;
+ }
+ decRef();
+ }
+
+ /**
+ * Returns a ref to a clone. NOTE: this clone is not
+ * enrolled in the pool, so you should simply close()
+ * it when you're done (ie, do not call release()).
+ */
+ public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
+ if (reader == null) {
+ getReader(context).decRef();
+ assert reader != null;
+ }
+ shared = true;
+ if (liveDocs != null) {
+ return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.docCount - info.getDelCount() - pendingDeleteCount);
+ } else {
+ assert reader.getLiveDocs() == liveDocs;
+ reader.incRef();
+ return reader;
+ }
+ }
+
+ public synchronized void initWritableLiveDocs() throws IOException {
+ assert Thread.holdsLock(writer);
+ assert info.docCount > 0;
+ //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
+ if (shared) {
+ // Copy on write: this means we've cloned a
+ // SegmentReader sharing the current liveDocs
+ // instance; must now make a private clone so we can
+ // change it:
+ LiveDocsFormat liveDocsFormat = info.getCodec().liveDocsFormat();
+ if (liveDocs == null) {
+ //System.out.println("create BV seg=" + info);
+ liveDocs = liveDocsFormat.newLiveDocs(info.docCount);
+ } else {
+ liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
+ }
+ shared = false;
+ } else {
+ assert liveDocs != null;
+ }
+ }
+
+ public synchronized Bits getLiveDocs() {
+ assert Thread.holdsLock(writer);
+ return liveDocs;
+ }
+
+ public synchronized Bits getReadOnlyLiveDocs() {
+ //System.out.println("getROLiveDocs seg=" + info);
+ assert Thread.holdsLock(writer);
+ shared = true;
+ //if (liveDocs != null) {
+ //System.out.println(" liveCount=" + liveDocs.count());
+ //}
+ return liveDocs;
+ }
+
+ public synchronized void dropChanges() {
+ // Discard (don't save) changes when we are dropping
+ // the reader; this is used only on the sub-readers
+ // after a successful merge. If deletes had
+ // accumulated on those sub-readers while the merge
+ // is running, by now we have carried forward those
+ // deletes onto the newly merged segment, so we can
+ // discard them on the sub-readers:
+ pendingDeleteCount = 0;
+ }
+
+ // Commit live docs to the directory (writes new
+ // _X_N.del files); returns true if it wrote the file
+ // and false if there were no new deletes to write:
+ public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
+ //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount);
+ if (pendingDeleteCount != 0) {
+ // We have new deletes
+ assert liveDocs.length() == info.docCount;
+
+ // Save in case we need to rollback on failure:
+ final SegmentInfo sav = (SegmentInfo) info.clone();
+ info.advanceDelGen();
+ info.setDelCount(info.getDelCount() + pendingDeleteCount);
+
+ // We can write directly to the actual name (vs to a
+ // .tmp & renaming it) because the file is not live
+ // until segments file is written:
+ boolean success = false;
+ try {
+ info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, dir, info, IOContext.DEFAULT);
+ success = true;
+ } finally {
+ if (!success) {
+ info.reset(sav);
+ }
+ }
+ pendingDeleteCount = 0;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ReadersAndLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")";
+ }
+}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java?rev=1299648&r1=1299647&r2=1299648&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java Mon Mar 12 12:23:14 2012
@@ -91,21 +91,27 @@ final class StandardDirectoryReader exte
try {
final SegmentInfo info = infos.info(i);
assert info.dir == dir;
- final IndexWriter.ReadersAndLiveDocs rld = writer.readerPool.get(info, true);
- final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
- if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
- readers.add(reader);
- infosUpto++;
- } else {
- reader.close();
- segmentInfos.remove(infosUpto);
+ final ReadersAndLiveDocs rld = writer.readerPool.get(info, true);
+ try {
+ final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
+ if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
+ // Steal the ref:
+ readers.add(reader);
+ infosUpto++;
+ } else {
+ reader.close();
+ segmentInfos.remove(infosUpto);
+ }
+ } finally {
+ writer.readerPool.release(rld);
}
success = true;
} catch(IOException ex) {
prior = ex;
} finally {
- if (!success)
+ if (!success) {
IOUtils.closeWhileHandlingException(prior, readers);
+ }
}
}
return new StandardDirectoryReader(dir, readers.toArray(new SegmentReader[readers.size()]),
@@ -219,12 +225,12 @@ final class StandardDirectoryReader exte
}
@Override
- protected final DirectoryReader doOpenIfChanged() throws CorruptIndexException, IOException {
+ protected DirectoryReader doOpenIfChanged() throws CorruptIndexException, IOException {
return doOpenIfChanged(null);
}
@Override
- protected final DirectoryReader doOpenIfChanged(final IndexCommit commit) throws CorruptIndexException, IOException {
+ protected DirectoryReader doOpenIfChanged(final IndexCommit commit) throws CorruptIndexException, IOException {
ensureOpen();
// If we were obtained by writer.getReader(), re-ask the
@@ -237,7 +243,7 @@ final class StandardDirectoryReader exte
}
@Override
- protected final DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException {
+ protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException {
ensureOpen();
if (writer == this.writer && applyAllDeletes == this.applyAllDeletes) {
return doOpenFromWriter(null);
@@ -246,7 +252,7 @@ final class StandardDirectoryReader exte
}
}
- private final DirectoryReader doOpenFromWriter(IndexCommit commit) throws CorruptIndexException, IOException {
+ private DirectoryReader doOpenFromWriter(IndexCommit commit) throws CorruptIndexException, IOException {
if (commit != null) {
throw new IllegalArgumentException("a reader obtained from IndexWriter.getReader() cannot currently accept a commit");
}